在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
Httpfs是hadoop2.x中hdfs项目的内置应用,基于tomcat和jesery,对外提供完备HDFS操作的RESTful接口,无需安装客户端,可方便实现数据交互,如从windows访问存储在hdfs上的文件。本文通过Httpfs说明文档,实现了一个基于libcurl和jsoncpp的httpfs客户端程序(C++)。 1.准备工作 1.1 编译jsoncpp jsoncpp下载地址:https://codeload.github.com/open-source-parsers/jsoncpp/zip/master 使用VS2010打开jsoncpp解压文件夹/makefiles/msvc2010/jsoncpp.sln,选择lib_json,设置项目的属性。具体设置为:1)常规里设置配置类型为.lib,使用多字节字符集C/C++->代码生成中的代码生成选择 /MD(release) /MDd(debug)。编译环境必须与我们开发的工程一致!!!
libcurl下载地址:https://curl.haxx.se/download/curl-7.47.1.tar.gz 打开curl解压目录\projects\Windows\VC10\curl-all.sln ,选择lib_debug和lib_release编译。vs2010引用静态链接失败解决: 1)给工程添加依赖的库:项目->属性->链接器->输入->附加依赖项,把libcurl.lib ws2_32.lib winmm.lib wldap32.lib添加进去(注意,debug配置用libcurld.lib). 2)、加入预编译选项:项目->属性->c/c++ ->预处理器->预处理器,把;BUILDING_LIBCURL;HTTP_ONLY复制进去(注意不要丢了;) 解决方案来自网络“vc2010使用libcurl静态库 遇到连接失败的解决方案”
1.3设置头文件引用 在工程路径下创建一个include目录,将libcurl和jsoncpp中的include文件夹下的文件复制到该include文件夹下,设置为vc++目录引用路径。
2.代码实现 HttpfsClient.H #pragma once #include <string> #include <vector> using namespace std; typedef struct FileStatus { __int64 accessTime; __int64 blocksize; string group; __int64 length; __int64 modificationTime; string owner; string pathSuffix; string permission; int replication; string type; }FileStatus; class CHttpFSClient { private: string m_hostaddr; //http://<HOST>:<PORT>/webhdfs/v1/ string m_username; //i.e. hadoop long m_timeout; long m_conntimeout; public: enum HTTP_TYPE{GET=0,PUT,POST,DEL}; public: CHttpFSClient(string& hostaddr,string& username); ~CHttpFSClient(void); bool create(string& local_file,string& rem_file,bool overwrite = false); bool append(string& local_file,string& rem_file); bool mkdirs(string& path); bool rename(string& src,string& dst); bool del(string& path, bool recursive=false); bool read(string& rem_file,string& local_file, long offset=0, long length=0); bool ls(string& rem_path,vector<FileStatus>& results); protected: static size_t fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream); static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream); static size_t memwrite_data(const char *contents, size_t size, size_t nmemb, string *stream); static size_t header_callback(const char *ptr, size_t size, size_t nmemb, std::string *stream); void showFileStatus(vector<FileStatus>& results); };
HttpfsClient.cpp // HttpfsClient.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "HttpfsClient.h" #include <assert.h> #include <stdio.h> #include <fcntl.h> #include <sys/stat.h> #include <curl/curl.h> #include <json/json.h> #include <iostream> #include <fstream> using namespace std; CHttpFSClient::CHttpFSClient(string& hostaddr,string& username) { m_hostaddr = hostaddr; m_username = username; m_timeout = 5184000; m_conntimeout = 120; /* In windows, this will init the winsock stuff */ curl_global_init(CURL_GLOBAL_ALL); } CHttpFSClient::~CHttpFSClient(void) { curl_global_cleanup(); } /* Create and Write to a File @param local_file string @param rem_file string @param overwirte: ture,false @return true/false Step 1: Submit a HTTP PUT request without automatically following redirects and without sending the file data. curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>] [&permission=<OCTAL>][&buffersize=<INT>]" The request is redirected to a datanode where the file data is to be written: HTTP/1.1 307 TEMPORARY_REDIRECT Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE... Content-Length: 0 Step 2: Submit another HTTP PUT request using the URL in the Location header with the file data to be written. curl -i -X PUT -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..." The client receives a 201 Created response with zero content length and the WebHDFS URI of the file in the Location header: HTTP/1.1 201 Created Location: webhdfs://<HOST>:<PORT>/<PATH> Content-Length: 0 */ bool CHttpFSClient::create(string& local_file,string& rem_file,bool overwrite) { string url = m_hostaddr + rem_file + "?op=CREATE&user.name="+m_username; if(overwrite) url += "&overwrite=true"; string szheader_buffer; char* redir_url; string strredir_url; long response_code=0; bool curlerr = false; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { curl_easy_setopt(curl, CURLOPT_PUT, 1L); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L); curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); //上传的字节数 res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "hdfs create first request failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s\n", curl_easy_strerror(res)); curlerr = true; } strredir_url = redir_url; } } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; //upload file to hdfs struct stat file_info; // get the file size of the local file stat(local_file.c_str(), &file_info); FILE * hd_src; hd_src = fopen(local_file.c_str(), "rb"); if(GetLastError() != 0) return false; struct curl_slist *headers = NULL; headers = curl_slist_append(headers, "Content-Type:application/octet-stream"); headers = curl_slist_append(headers, "Content-Type:application/octet-stream"); curl = curl_easy_init(); if(curl) { // we want to use our own read function curl_easy_setopt(curl, CURLOPT_READFUNCTION, CHttpFSClient::fileread_callback); // enable uploading curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); // HTTP PUT please curl_easy_setopt(curl, CURLOPT_PUT, 1L); // specify target URL, and note that this URL should include a file name, not only a directory curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str()); // specify content type curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); // now specify which file to upload curl_easy_setopt(curl, CURLOPT_READDATA, hd_src); // provide the size of the upload, we specicially typecast the value to curl_off_t // since we must be sure to use the correct data size curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)file_info.st_size); // Now run off and do what you've been told! res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "upload file to hdfs failed: %s\n", curl_easy_strerror(res)); curlerr = true; } } fclose(hd_src); // close the local file // always cleanup!!!! curl_slist_free_all(headers); curl_easy_cleanup(curl); if(curlerr) return false; return true; } /* Append to a File @param local_file string @param rem_file string @return true/false Step 1: Submit a HTTP POST request without automatically following redirects and without sending the file data. curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND[&buffersize=<INT>]" The request is redirected to a datanode where the file data is to be appended: HTTP/1.1 307 TEMPORARY_REDIRECT Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND... Content-Length: 0 Step 2: Submit another HTTP POST request using the URL in the Location header with the file data to be appended. curl -i -X POST -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND..." The client receives a response with zero content length: HTTP/1.1 200 OK Content-Length: 0 */ bool CHttpFSClient::append(string& local_file,string& rem_file) { string url = m_hostaddr + rem_file + "?op=APPEND&user.name="+m_username; char* redir_url; string strredir_url; long response_code=0; bool curlerr = false; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { curl_easy_setopt(curl, CURLOPT_POST, 1L); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0); res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "hdfs append first request failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s\n", curl_easy_strerror(res)); curlerr = true; } strredir_url = redir_url; } } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; // append file to hdfs struct curl_slist *headers = NULL; headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); curl = curl_easy_init(); if(curl) { curl_easy_setopt(curl, CURLOPT_POST, 1L); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str()); //curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); /*//multipart/formdata请求 struct curl_httppost *formpost = NULL; struct curl_httppost *lastptr = NULL; curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "file", CURLFORM_FILE, local_file.c_str(), CURLFORM_CONTENTTYPE, "application/octet-stream", CURLFORM_END); curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost);*/ //C++代码一次读取文本文件全部内容到string对象 ifstream fin(local_file.c_str(),ios::in); istreambuf_iterator<char> beg(fin), end; string strdata(beg, end); fin.close(); curl_easy_setopt(curl,CURLOPT_POSTFIELDS,strdata.c_str()); res = curl_easy_perform(curl); //curl_formfree(formpost); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "append file to hdfs failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } } } // always cleanup!!!! curl_slist_free_all(headers); curl_easy_cleanup(curl); if(curlerr) return false; if(response_code == 200) return true; else return false; } /* Make a Directory Submit a HTTP PUT request. curl -i -X PUT "http://<HOST>:<PORT>/<PATH>?op=MKDIRS[&permission=<OCTAL>]" The client receives a response with a boolean JSON object: HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked {"boolean": true} */ bool CHttpFSClient::mkdirs(string& path) { string url = m_hostaddr + path + "?op=MKDIRS&user.name="+m_username; long response_code=0; long headerlen = 0; bool curlerr = false; string response_contents; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { // http put curl_easy_setopt(curl, CURLOPT_PUT, 1L); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_HEADER, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents); curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "hdfs mkdirs failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } } } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; if(response_code == 200) { Json::Reader reader; Json::Value root; const char *content = response_contents.c_str(); if(!reader.parse(content+headerlen,content+response_contents.length(),root,false)) return false; return root["boolean"].asBool(); } else return false; } /* Rename a File/Directory Submit a HTTP PUT request. curl -i -X PUT "<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>" The client receives a response with a boolean JSON object: HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked {"boolean": true} */ bool CHttpFSClient::rename(string& src,string& dst) { string url = m_hostaddr + src + "?op=RENAME&user.name="+m_username+"&destination="+dst; long response_code=0; long headerlen = 0; bool curlerr = false; string response_contents; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { // http put curl_easy_setopt(curl, CURLOPT_PUT, 1L); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_HEADER, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents); curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "hdfs rename failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } } } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; if(response_code == 200) { Json::Reader reader; Json::Value root; const char *content = response_contents.c_str(); if(!reader.parse(content+headerlen,content+response_contents.length(),root,false)) return false; return root["boolean"].asBool(); } else return false; } /* Delete a File/Directory @param file string, the file or directory to be deleted @return ture/false Submit a HTTP DELETE request curl -i -X DELETE "http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]" The client receives a response with a boolean JSON object: HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked {"boolean": true} */ bool CHttpFSClient::del(string& path, bool recursive) { string url = m_hostaddr + path + "?op=DELETE&user.name="+m_username; if(recursive) url+="&recursive=true"; string response_contents; char redir_url[100]; long response_code=0; long headerlen = 0; bool curlerr = false; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { // Set the DELETE command curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_HEADER, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data); curl_easy_setopt(curl,CURLOPT_WRITEDATA,&response_contents); res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "hdfs del failed: %s\n", curl_easy_strerror(res)); curlerr = true; } else { res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen); if(res != CURLE_OK) { fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\n", curl_easy_strerror(res)); curlerr = true; } } } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; if(response_code == 200) { Json::Reader reader; Json::Value root; const char *content = response_contents.c_str(); if(!reader.parse(content+headerlen,content+response_contents.length(),root,false)) return false; return root["boolean"].asBool(); } else return false; } /* Open and Read a File of remote an write to local_file @param @remote_file @param @local_file Submit a HTTP GET request with automatically following redirects. curl -i -L "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]" The request is redirected to a datanode where the file data can be read: HTTP/1.1 307 TEMPORARY_REDIRECT Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=OPEN... Content-Length: 0 The client follows the redirect to the datanode and receives the file data: HTTP/1.1 200 OK Content-Type: application/octet-stream Content-Length: 22 Hello, webhdfs user! */ bool CHttpFSClient::read(string& rem_file,string& local_file, long offset, long length) { char url[200]; if(offset != 0 && length != 0) sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s&offset=%ld&length=%ld",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str(),offset,length); else sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str()); long response_code=0; bool curlerr = false; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); if(curl) { // HTTP GET please curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); // specify target URL, and note that this URL should include a file name, not only a directory curl_easy_setopt(curl, CURLOPT_URL, url); /* send all data to this function */ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::filewrite_data); FILE * pagefile; pagefile = fopen(local_file.c_str(), "wb"); if(GetLastError() != 0) return false; // write the page body to this file handle curl_easy_setopt(curl, CURLOPT_WRITEDATA, pagefile); // Now run off and do what you've been told! res = curl_easy_perform(curl); // Check for errors if(res != CURLE_OK) { fprintf(stderr, "get file from hdfs failed: %s\n", curl_easy_strerror(res)); curlerr = true; } fclose(pagefile); // close the local file } // always cleanup!!!! curl_easy_cleanup(curl); if(curlerr) return false; return true; } /* list a directory @param $dir string, the dir to list @return json object Submit a HTTP GET request. curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS" The client receives a response with a FileStatuses JSON object: HTTP/1.1 200 OK Content-Type: application/json Content-Length: 427 { "FileStatuses": { "FileStatus": [ { "accessTime" : 1320171722771, "blockSize" : 33554432, "group" : "supergroup", "length" : 24930, "modificationTime": 1320171722771, "owner" : "webuser", "pathSuffix" : "a.patch", "permission" : "644", "replication" : 1, "type" : "FILE" }, { "accessTime" : 0, "blockSize" : 0, "group" : "supergroup", "length" : 0, "modificationTime": 1320895981256, "owner" : "szetszwo", "pathSuffix" : "bar", "permission" : "711", "replication" : 0, "type" : "DIRECTORY" }, ... ] } } */ bool CHttpFSClient::ls(string& rem_path,vector<FileStatus>& results) { string url = m_hostaddr + rem_path + "?op=LISTSTATUS&user.name="+m_username; long response_code=0; long headerlen = 0; bool curlerr = false; string response_contents; CURL *curl; CURLcode res; // get a curl handle curl = curl_easy_init(); 全部评论
专题导读
热门推荐
热门话题
阅读排行榜
|
请发表评论