10 #include "curl/curl.h"
23 const uint32_t
kMaxNumJobs = std::numeric_limits<uint32_t>::max();
28 size_t SendCB(
void *ptr,
size_t size,
size_t nmemb,
void *userp) {
31 size_t max_chunk_size = size * nmemb;
32 if (max_chunk_size < 1) {
36 size_t current_chunk_size = 0;
37 while (current_chunk_size < max_chunk_size) {
40 const size_t read_size = std::min(
41 max_chunk_size - current_chunk_size,
43 current_chunk_size += read_size;
46 payload->
index += read_size;
49 const size_t max_read_size = max_chunk_size - current_chunk_size;
52 static_cast<unsigned char *>(ptr) + current_chunk_size);
53 current_chunk_size += nbytes;
61 return current_chunk_size;
64 size_t RecvCB(
void *buffer,
size_t size,
size_t nmemb,
void *userp) {
65 std::string *my_buffer =
static_cast<std::string *
>(userp);
67 if (size * nmemb < 1) {
71 *my_buffer =
static_cast<char *
>(buffer);
73 return my_buffer->size();
87 , bytes_dispatched_(0)
88 , initialized_(false) { }
93 const std::string &session_token,
94 const std::string &key_id,
95 const std::string &secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
101 pthread_mutexattr_t attr;
102 if (pthread_mutexattr_init(&attr)
103 || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
105 || pthread_mutexattr_destroy(&attr)) {
107 "Could not initialize SessionContext lock.");
127 "Could not initialize SessionContext - Existing open object packs.");
139 const std::string &new_root_hash,
159 results = future->
Get() && results;
164 if (old_root_hash.empty() || new_root_hash.empty()) {
167 bool commit_result =
Commit(old_root_hash, new_root_hash, tag);
168 if (!commit_result) {
170 "SessionContext: could not commit session. Aborting.");
200 const std::string &name,
201 const bool force_dispatch) {
206 "Error: Called SessionBaseContext::CommitBucket without an open "
220 if (force_dispatch) {
225 uint64_t new_size = 0;
266 reinterpret_cast<void *>(
this));
287 const std::string &new_root_hash,
290 request_input.
Add(
"old_root_hash", old_root_hash);
291 request_input.
Add(
"new_root_hash", new_root_hash);
292 request_input.
Add(
"tag_name", tag.
name());
296 request_input.
Add(
"tag_channel", 0);
319 const std::string json_msg =
"{\"session_token\" : \"" +
session_token_
320 +
"\", \"payload_digest\" : \""
322 +
"\", \"header_size\" : \""
324 +
"\", \"api_version\" : \""
336 const size_t payload_size = json_msg.size() + serializer.
GetHeaderSize()
340 CURL *h_curl = curl_easy_init();
347 std::string header_str = std::string(
"Authorization: ") +
key_id_ +
" "
348 +
Base64(hmac.ToString(
false));
349 struct curl_slist *auth_header = NULL;
350 auth_header = curl_slist_append(auth_header, header_str.c_str());
351 header_str = std::string(
"Message-Size: ") +
StringifyInt(json_msg.size());
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
356 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
357 curl_easy_setopt(h_curl, CURLOPT_USERAGENT,
"cvmfs/" CVMFS_VERSION);
358 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
359 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST,
"POST");
360 curl_easy_setopt(h_curl, CURLOPT_URL, (
api_url_ +
"/payloads").c_str());
361 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
362 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
363 static_cast<curl_off_t>(payload_size));
364 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
365 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION,
SendCB);
366 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION,
RecvCB);
367 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
370 CURLcode ret = curl_easy_perform(h_curl);
373 "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
378 reply_json->
root(),
"status", JSON_STRING);
379 const bool ok = (reply_status != NULL
380 && std::string(reply_status->string_value) ==
"ok");
383 "SessionContext::DoUpload - error reply: %s", reply.c_str());
386 curl_easy_cleanup(h_curl);
401 PANIC(
kLogStderr,
"SessionContext: could not submit payload. Aborting.");
static void * UploadLoop(void *data)
UniquePtr< Tube< UploadJob > > upload_jobs_
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
std::string description() const
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
std::string session_token_
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t bytes_dispatched_
void Set(const T &object)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static JsonDocument * Create(const std::string &text)
uint64_t bytes_committed_
Link * EnqueueFront(ItemT *item)
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
string StringifyInt(const int64_t value)
std::vector< ObjectPack::BucketHandle > active_handles_
virtual ~SessionContextBase()
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
string Base64(const string &data)
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
ObjectPack * current_pack_
Tube< Future< bool > > upload_results_
size_t GetNoObjects() const
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
const uint32_t kMaxNumJobs
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
const JSON * root() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)