CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_local.cc
Go to the documentation of this file.
1 
5 #include "upload_local.h"
6 
7 #include <errno.h>
8 
9 #include <string>
10 
12 #include "util/logging.h"
13 #include "util/posix.h"
14 
15 namespace upload {
16 
18  : AbstractUploader(spooler_definition)
19  , backend_file_mode_(default_backend_file_mode_ ^ GetUmask())
20  , backend_dir_mode_(default_backend_dir_mode_ ^ GetUmask())
21  , upstream_path_(spooler_definition.spooler_configuration)
22  , temporary_path_(spooler_definition.temporary_path) {
23  assert(spooler_definition.IsValid()
24  && spooler_definition.driver_type == SpoolerDefinition::Local);
25 
26  atomic_init32(&copy_errors_);
27 }
28 
29 bool LocalUploader::WillHandle(const SpoolerDefinition &spooler_definition) {
30  return spooler_definition.driver_type == SpoolerDefinition::Local;
31 }
32 
33 unsigned int LocalUploader::GetNumberOfErrors() const {
34  return atomic_read32(&copy_errors_);
35 }
36 
39  && MkdirDeep(upstream_path_ + "/stats", backend_dir_mode_, false);
40 }
41 
42 void LocalUploader::DoUpload(const std::string &remote_path,
44  const CallbackTN *callback) {
45  LogCvmfs(kLogSpooler, kLogVerboseMsg, "FileUpload call started.");
46 
47  // create destination in backend storage temporary directory
48  std::string tmp_path;
49  FILE *ftmp = CreateTempFile(temporary_path_ + "/upload", 0666, "w",
50  &tmp_path);
51  if (ftmp == NULL) {
53  "failed to create temp path for "
54  "upload of file '%s' (errno: %d)",
55  source->GetPath().c_str(), errno);
56  atomic_inc32(&copy_errors_);
57  Respond(callback, UploaderResults(1, source->GetPath()));
58  return;
59  }
60 
61  // copy file into controlled temporary directory location
62  bool rvb = source->Open();
63  if (!rvb) {
64  fclose(ftmp);
65  unlink(tmp_path.c_str());
66  atomic_inc32(&copy_errors_);
67  Respond(callback, UploaderResults(100, source->GetPath()));
68  return;
69  }
70  unsigned char buffer[kPageSize];
71  ssize_t rbytes;
72  do {
73  rbytes = source->Read(buffer, kPageSize);
74  size_t wbytes = 0;
75  if (rbytes > 0) {
76  wbytes = fwrite(buffer, 1, rbytes, ftmp);
77  }
78  if ((rbytes < 0) || (static_cast<size_t>(rbytes) != wbytes)) {
79  source->Close();
80  fclose(ftmp);
81  unlink(tmp_path.c_str());
82  atomic_inc32(&copy_errors_);
83  Respond(callback, UploaderResults(100, source->GetPath()));
84  return;
85  }
86  } while (rbytes == kPageSize);
87  source->Close();
88  fclose(ftmp);
89 
90  // move the file in place (atomic operation)
91  int rvi = Move(tmp_path, remote_path);
92  if (rvi != 0) {
94  "failed to move file '%s' from the "
95  "staging area to the final location: "
96  "'%s'",
97  tmp_path.c_str(), remote_path.c_str());
98  unlink(tmp_path.c_str());
99  atomic_inc32(&copy_errors_);
100  Respond(callback, UploaderResults(rvi, source->GetPath()));
101  return;
102  }
103 
104  Respond(callback, UploaderResults(rvi, source->GetPath()));
105 }
106 
108  const CallbackTN *callback) {
109  std::string tmp_path;
110  const int tmp_fd = CreateAndOpenTemporaryChunkFile(&tmp_path);
111  if (tmp_fd < 0) {
112  atomic_inc32(&copy_errors_);
113  return NULL;
114  }
115 
116  return new LocalStreamHandle(callback, tmp_fd, tmp_path);
117 }
118 
120  UploadBuffer buffer,
121  const CallbackTN *callback) {
122  LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle);
123 
124  const size_t bytes_written = write(local_handle->file_descriptor, buffer.data,
125  buffer.size);
126  if (bytes_written != buffer.size) {
127  const int cpy_errno = errno;
129  "failed to write %lu bytes to '%s' "
130  "(errno: %d)",
131  buffer.size, local_handle->temporary_path.c_str(), cpy_errno);
132  atomic_inc32(&copy_errors_);
133  Respond(callback,
135  return;
136  }
137 
139 }
140 
142  const shash::Any &content_hash) {
143  int retval = 0;
144  LocalStreamHandle *local_handle = static_cast<LocalStreamHandle *>(handle);
145 
146  retval = close(local_handle->file_descriptor);
147  if (retval != 0) {
148  const int cpy_errno = errno;
150  "failed to close temp file '%s' "
151  "(errno: %d)",
152  local_handle->temporary_path.c_str(), cpy_errno);
153  atomic_inc32(&copy_errors_);
154  Respond(handle->commit_callback,
156  return;
157  }
158 
159  std::string final_path;
160  if (local_handle->remote_path != "") {
161  final_path = local_handle->remote_path;
162  } else {
163  final_path = "data/" + content_hash.MakePath();
164  }
165  if (!Peek(final_path)) {
166  retval = Move(local_handle->temporary_path, final_path);
167  if (retval != 0) {
168  const int cpy_errno = errno;
170  "failed to move temp file '%s' to "
171  "final location '%s' (errno: %d)",
172  local_handle->temporary_path.c_str(), final_path.c_str(),
173  cpy_errno);
174  atomic_inc32(&copy_errors_);
175  Respond(handle->commit_callback,
177  return;
178  }
179  if (!content_hash.HasSuffix()
180  || content_hash.suffix == shash::kSuffixPartial) {
182  CountUploadedBytes(GetFileSize(upstream_path_ + "/" + final_path));
183  } else if (content_hash.suffix == shash::kSuffixCatalog) {
186  }
187  } else {
188  const int retval = unlink(local_handle->temporary_path.c_str());
189  if (retval != 0) {
191  "failed to remove temporary file '%s' (errno: %d)",
192  local_handle->temporary_path.c_str(), errno);
193  }
194  CountDuplicates();
195  }
196 
197  const CallbackTN *callback = handle->commit_callback;
198  delete local_handle;
199 
201 }
202 
207 void LocalUploader::DoRemoveAsync(const std::string &file_to_delete) {
208  const int retval = unlink((upstream_path_ + "/" + file_to_delete).c_str());
209  if ((retval != 0) && (errno != ENOENT))
210  atomic_inc32(&copy_errors_);
211  Respond(NULL, UploaderResults());
212 }
213 
214 bool LocalUploader::Peek(const std::string &path) {
215  bool retval = FileExists(upstream_path_ + "/" + path);
216  return retval;
217 }
218 
219 bool LocalUploader::Mkdir(const std::string &path) {
220  return MkdirDeep(upstream_path_ + "/" + path, backend_dir_mode_, false);
221 }
222 
224  const std::string src = "data/" + object.MakePath();
225  const std::string dest = upstream_path_ + "/" + object.MakeAlternativePath();
226  return SymlinkForced(src, dest);
227 }
228 
229 int LocalUploader::Move(const std::string &local_path,
230  const std::string &remote_path) const {
231  const std::string destination_path = upstream_path_ + "/" + remote_path;
232 
233  // make sure the file has the right permissions
234  int retval = chmod(local_path.c_str(), backend_file_mode_);
235  int retcode = (retval == 0) ? 0 : 101;
236  if (retcode != 0) {
238  "failed to set file permission '%s' "
239  "errno: %d",
240  local_path.c_str(), errno);
241  return retcode;
242  }
243 
244  // move the file in place
245  retval = rename(local_path.c_str(), destination_path.c_str());
246  retcode = (retval == 0) ? 0 : errno;
247  if (retcode != 0) {
249  "failed to move file '%s' to '%s' "
250  "errno: %d",
251  local_path.c_str(), remote_path.c_str(), errno);
252  }
253 
254  return retcode;
255 }
256 
257 int64_t LocalUploader::DoGetObjectSize(const std::string &file_name) {
258  return GetFileSize(upstream_path_ + "/" + file_name);
259 }
260 
261 } // namespace upload
bool MakeCacheDirectories(const std::string &path, const mode_t mode)
Definition: posix.cc:890
const std::string upstream_path_
Definition: upload_local.h:89
mode_t GetUmask()
Definition: posix.cc:1387
bool SymlinkForced(const std::string &src, const std::string &dest)
Definition: posix.cc:842
virtual bool Create()
Definition: upload_local.cc:37
DriverType driver_type
the type of the spooler driver
int CreateAndOpenTemporaryChunkFile(std::string *path) const
LocalUploader(const SpoolerDefinition &spooler_definition)
Definition: upload_local.cc:17
UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1013
void Respond(const CallbackTN *callback, const UploaderResults &result) const
bool Peek(const std::string &path)
const std::string temporary_path_
Definition: upload_local.h:90
bool HasSuffix() const
Definition: hash.h:231
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
void CountUploadedChunks() const
bool FileExists(const std::string &path)
Definition: posix.cc:803
const CallbackTN * commit_callback
unsigned int GetNumberOfErrors() const
Definition: upload_local.cc:33
virtual bool Open()=0
int Move(const std::string &local_path, const std::string &remote_path) const
bool Mkdir(const std::string &path)
const char kSuffixPartial
Definition: hash.h:57
const char kSuffixCatalog
Definition: hash.h:54
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:855
void CountUploadedBytes(int64_t bytes_written) const
const std::string temporary_path
Definition: upload_local.h:27
const mode_t backend_file_mode_
Definition: upload_local.h:40
int64_t DoGetObjectSize(const std::string &file_name)
virtual std::string GetPath() const =0
bool PlaceBootstrappingShortcut(const shash::Any &object)
void CountUploadedCatalogBytes(int64_t bytes_written) const
void DoRemoveAsync(const std::string &file_to_delete)
const unsigned kPageSize
Definition: posix.h:30
void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
Definition: upload_local.cc:42
const mode_t backend_dir_mode_
Definition: upload_local.h:41
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
const int kLogVerboseMsg
atomic_int32 copy_errors_
Definition: upload_local.h:91
Suffix suffix
Definition: hash.h:123
std::string MakePath() const
Definition: hash.h:306
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_local.cc:29
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545