POST 스크립트에서 Python를 사용하여 파일을 보내는 방법이 있습니까?
보낸 사람 : http://docs.python-requests.org/en/latest/user/quickstart/#post-a-multipart-encoded-file
요청으로 멀티 파트 인코딩 파일을 업로드하는 것이 매우 간단합니다.
with open('report.xls', 'rb') as f:
r = requests.post('http://httpbin.org/post', files={'report.xls': f})
그게 다야. 나는 농담하지 않습니다-이것은 한 줄의 코드입니다. 파일이 전송되었습니다. 점검 해보자:
>>> r.text
{
"Origin": "179.13.100.4",
"files": {
"report.xls": "<censored...binary...data>"
},
"form": {},
"url": "http://httpbin.org/post",
"args": {},
"headers": {
"Content-Length": "3196",
"Accept-Encoding": "identity, deflate, compress, gzip",
"Accept": "*/*",
"User-Agent": "python-requests/0.8.0",
"Host": "httpbin.org:80",
"Content-Type": "multipart/form-data; boundary=127.0.0.1.502.21746.1321131593.786.1"
},
"data": ""
}
예. urllib2
모듈을 사용하고 multipart/form-data
컨텐츠 유형을 사용하여 인코딩하십시오. 시작하는 데 도움이되는 샘플 코드는 다음과 같습니다. 파일 업로드 이상의 것이 아니라 읽을 수 있으며 작동 방식을 확인할 수 있습니다.
user_agent = "image uploader"
default_message = "Image $current of $total"
import logging
import os
from os.path import abspath, isabs, isdir, isfile, join
import random
import string
import sys
import mimetypes
import urllib2
import httplib
import time
import re
def random_string (length):
return ''.join (random.choice (string.letters) for ii in range (length + 1))
def encode_multipart_data (data, files):
boundary = random_string (30)
def get_content_type (filename):
return mimetypes.guess_type (filename)[0] or 'application/octet-stream'
def encode_field (field_name):
return ('--' + boundary,
'Content-Disposition: form-data; name="%s"' % field_name,
'', str (data [field_name]))
def encode_file (field_name):
filename = files [field_name]
return ('--' + boundary,
'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, filename),
'Content-Type: %s' % get_content_type(filename),
'', open (filename, 'rb').read ())
lines = []
for name in data:
lines.extend (encode_field (name))
for name in files:
lines.extend (encode_file (name))
lines.extend (('--%s--' % boundary, ''))
body = '\r\n'.join (lines)
headers = {'content-type': 'multipart/form-data; boundary=' + boundary,
'content-length': str (len (body))}
return body, headers
def send_post (url, data, files):
req = urllib2.Request (url)
connection = httplib.HTTPConnection (req.get_Host ())
connection.request ('POST', req.get_selector (),
*encode_multipart_data (data, files))
response = connection.getresponse ()
logging.debug ('response = %s', response.read ())
logging.debug ('Code: %s %s', response.status, response.reason)
def make_upload_file (server, thread, delay = 15, message = None,
username = None, email = None, password = None):
delay = max (int (delay or '0'), 15)
def upload_file (path, current, total):
assert isabs (path)
assert isfile (path)
logging.debug ('Uploading %r to %r', path, server)
message_template = string.Template (message or default_message)
data = {'MAX_FILE_SIZE': '3145728',
'sub': '',
'mode': 'regist',
'com': message_template.safe_substitute (current = current, total = total),
'resto': thread,
'name': username or '',
'email': email or '',
'pwd': password or random_string (20),}
files = {'upfile': path}
send_post (server, data, files)
logging.info ('Uploaded %r', path)
Rand_delay = random.randint (delay, delay + 5)
logging.debug ('Sleeping for %.2f seconds------------------------------\n\n', Rand_delay)
time.sleep (Rand_delay)
return upload_file
def upload_directory (path, upload_file):
assert isabs (path)
assert isdir (path)
matching_filenames = []
file_matcher = re.compile (r'\.(?:jpe?g|gif|png)$', re.IGNORECASE)
for dirpath, dirnames, filenames in os.walk (path):
for name in filenames:
file_path = join (dirpath, name)
logging.debug ('Testing file_path %r', file_path)
if file_matcher.search (file_path):
matching_filenames.append (file_path)
else:
logging.info ('Ignoring non-image file %r', path)
total_count = len (matching_filenames)
for index, file_path in enumerate (matching_filenames):
upload_file (file_path, index + 1, total_count)
def run_upload (options, paths):
upload_file = make_upload_file (**options)
for arg in paths:
path = abspath (arg)
if isdir (path):
upload_directory (path, upload_file)
Elif isfile (path):
upload_file (path)
else:
logging.error ('No such path: %r' % path)
logging.info ('Done!')
파일 객체에서 urlopen을 직접 사용하지 못하게하는 유일한 방법은 내장 파일 객체에 len 정의가 없다는 것입니다. 간단한 방법은 urlopen에 올바른 파일을 제공하는 서브 클래스를 작성하는 것입니다. 또한 아래 파일에서 Content-Type 헤더를 수정했습니다.
import os
import urllib2
class EnhancedFile(file):
def __init__(self, *args, **keyws):
file.__init__(self, *args, **keyws)
def __len__(self):
return int(os.fstat(self.fileno())[6])
theFile = EnhancedFile('a.xml', 'r')
theUrl = "http://example.com/abcde"
theHeaders= {'Content-Type': 'text/xml'}
theRequest = urllib2.Request(theUrl, theFile, theHeaders)
response = urllib2.urlopen(theRequest)
theFile.close()
for line in response:
print line
Chris Atlee의 poster 라이브러리는이 작업에 특히 효과적입니다 (특히 편의 기능 poster.encode.multipart_encode()
). 또한 전체 파일을 메모리에로드하지 않고도 대용량 파일 스트리밍을 지원합니다. Python issue 3244 도 참조하십시오.
Django rest api와 그 작업을 테스트하려고합니다.
def test_upload_file(self):
filename = "/Users/Ranvijay/tests/test_price_matrix.csv"
data = {'file': open(filename, 'rb')}
client = APIClient()
# client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
response = client.post(reverse('price-matrix-csv'), data, format='multipart')
print response
self.assertEqual(response.status_code, status.HTTP_200_OK)
def visit_v2(device_code, camera_code):
image1 = MultipartParam.from_file("files", "/home/yuzx/1.txt")
image2 = MultipartParam.from_file("files", "/home/yuzx/2.txt")
datagen, headers = multipart_encode([('device_code', device_code), ('position', 3), ('person_data', person_data), image1, image2])
print "".join(datagen)
if server_port == 80:
port_str = ""
else:
port_str = ":%s" % (server_port,)
url_str = "http://" + server_ip + port_str + "/adopen/device/visit_v2"
headers['nothing'] = 'nothing'
request = urllib2.Request(url_str, datagen, headers)
try:
response = urllib2.urlopen(request)
resp = response.read()
print "http_status =", response.code
result = json.loads(resp)
print resp
return result
except urllib2.HTTPError, e:
print "http_status =", e.code
print e.read()