import http.client as http
import os
import json
import io
import mimetypes
from collections import OrderedDict
import logging
from urllib.parse import quote_plus
from requests import Response
try:
from requests_toolbelt import MultipartEncoder
except ImportError:
from acdcli.bundled.encoder import MultipartEncoder
from .common import *
FS_RW_CHUNK_SZ = 1024 * 128
"""basic chunk size for file system r/w operations"""
PARTIAL_SUFFIX = '.__incomplete'
"""suffix (file ending) for incomplete files"""
CHUNK_SIZE = 500 * 1024 ** 2 # basically arbitrary
"""download chunk size"""
CHUNK_MAX_RETRY = 5
"""retry limit for failed chunk"""
logger = logging.getLogger(__name__)
class _TeeBufferedReader(object):
"""Proxy buffered reader object that allows callbacks on read operations."""
def __init__(self, file: io.BufferedReader, callbacks: list = None):
self._file = file
self._callbacks = callbacks
def __getattr__(self, item):
try:
return object.__getattr__(item)
except AttributeError:
return getattr(self._file, item)
def read(self, ln=-1):
ln = ln if ln in (0, -1) else FS_RW_CHUNK_SZ
chunk = self._file.read(ln)
for callback in self._callbacks or []:
callback(chunk)
return chunk
def _tee_open(path: str, **kwargs) -> _TeeBufferedReader:
f = open(path, 'rb')
return _TeeBufferedReader(f, **kwargs)
def _get_mimetype(file_name: str = '') -> str:
mt = mimetypes.guess_type(file_name)[0]
return mt if mt else 'application/octet-stream'
def _multipart_stream(metadata: dict, stream, boundary: str, read_callbacks=None):
"""Generator for chunked multipart/form-data file upload from stream input.
:param metadata: file info, leave empty for overwrite
:param stream: readable object"""
if metadata:
yield str.encode('--%s\r\nContent-Disposition: form-data; '
'name="metadata"\r\n\r\n' % boundary +
'%s\r\n' % json.dumps(metadata))
yield str.encode('--%s\r\n' % boundary) + \
b'Content-Disposition: form-data; name="content"; filename="foo"\r\n' + \
b'Content-Type: application/octet-stream\r\n\r\n'
while True:
f = stream.read(FS_RW_CHUNK_SZ)
if f:
for cb in read_callbacks or []:
cb(f)
yield f
else:
break
yield str.encode('\r\n--%s--\r\n' % boundary +
'multipart/form-data; boundary=%s' % boundary)
[docs]class ContentMixin(object):
"""Implements content portion of the ACD API."""
[docs] def create_folder(self, name: str, parent=None) -> dict:
body = {'kind': 'FOLDER', 'name': name}
if parent:
body['parents'] = [parent]
body_str = json.dumps(body)
acc_codes = [http.CREATED]
r = self.BOReq.post(self.metadata_url + 'nodes', acc_codes=acc_codes, data=body_str)
if r.status_code not in acc_codes:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def create_file(self, file_name: str, parent: str = None) -> dict:
params = {'suppress': 'deduplication'}
basename = os.path.basename(file_name)
metadata = {'kind': 'FILE', 'name': basename}
if parent:
metadata['parents'] = [parent]
mime_type = _get_mimetype(basename)
f = io.BytesIO()
# basename is ignored
m = MultipartEncoder(fields=OrderedDict([('metadata', json.dumps(metadata)),
('content', (quote_plus(basename), f, mime_type))])
)
ok_codes = [http.CREATED]
r = self.BOReq.post(self.content_url + 'nodes', params=params, data=m,
acc_codes=ok_codes, headers={'Content-Type': m.content_type})
if r.status_code not in ok_codes:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def clear_file(self, node_id: str) -> dict:
"""Clears a file's content by overwriting it with an empty BytesIO.
:param node_id: valid file node ID"""
m = MultipartEncoder(fields={('content', (' ', io.BytesIO(), _get_mimetype()))})
r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content', params={},
data=m, stream=True, headers={'Content-Type': m.content_type})
if r.status_code not in OK_CODES:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def upload_file(self, file_name: str, parent: str = None,
read_callbacks=None, deduplication=False) -> dict:
params = {'suppress': 'deduplication'}
if deduplication and os.path.getsize(file_name) > 0:
params = {}
basename = os.path.basename(file_name)
metadata = {'kind': 'FILE', 'name': basename}
if parent:
metadata['parents'] = [parent]
mime_type = _get_mimetype(basename)
f = _tee_open(file_name, callbacks=read_callbacks)
# basename is ignored
m = MultipartEncoder(fields=OrderedDict([('metadata', json.dumps(metadata)),
(
'content',
(quote_plus(basename), f, mime_type))]))
ok_codes = [http.CREATED]
r = self.BOReq.post(self.content_url + 'nodes', params=params, data=m,
acc_codes=ok_codes, stream=True,
headers={'Content-Type': m.content_type})
if r.status_code not in ok_codes:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def upload_stream(self, stream, file_name: str, parent: str = None,
read_callbacks=None, deduplication=False) -> dict:
""":param stream: readable object
:param parent: parent node id, defaults to root node if None"""
params = {} if deduplication else {'suppress': 'deduplication'}
metadata = {'kind': 'FILE', 'name': file_name}
if parent:
metadata['parents'] = [parent]
import uuid
boundary = uuid.uuid4().hex
ok_codes = [http.CREATED]
r = self.BOReq.post(self.content_url + 'nodes', params=params,
data=_multipart_stream(metadata, stream, boundary, read_callbacks),
acc_codes=ok_codes,
headers={'Content-Type': 'multipart/form-data; boundary=%s'
% boundary})
if r.status_code not in ok_codes:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def overwrite_file(self, node_id: str, file_name: str,
read_callbacks: list = None, deduplication=False) -> dict:
params = {} if deduplication else {'suppress': 'deduplication'}
basename = os.path.basename(file_name)
mime_type = _get_mimetype(basename)
f = _tee_open(file_name, callbacks=read_callbacks)
# basename is ignored
m = MultipartEncoder(fields={('content', (quote_plus(basename), f, mime_type))})
r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content', params=params,
data=m, stream=True, headers={'Content-Type': m.content_type})
if r.status_code not in OK_CODES:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def overwrite_stream(self, stream, node_id: str, read_callbacks: list = None) -> dict:
"""Overwrite content of node with ID *node_id* with content of *stream*.
:param stream: readable object"""
metadata = {}
import uuid
boundary = uuid.uuid4().hex
r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content',
data=_multipart_stream(metadata, stream, boundary, read_callbacks),
headers={'Content-Type': 'multipart/form-data; boundary=%s'
% boundary})
if r.status_code not in OK_CODES:
raise RequestError(r.status_code, r.text)
return r.json()
[docs] def download_file(self, node_id: str, basename: str, dirname: str = None, **kwargs):
"""Deals with download preparation, download with :func:`chunked_download` and finish.
Calls callbacks while fast forwarding through incomplete file (if existent).
Will not check for existing file prior to download and overwrite existing file on finish.
:param dirname: a valid local directory name, or cwd if None
:param basename: a valid file name
:param kwargs: \
- length: the total length of the file
- write_callbacks (list[function]): passed on to :func:`chunked_download`
- resume (bool=True): whether to resume if partial file exists"""
dl_path = basename
if dirname:
dl_path = os.path.join(dirname, basename)
part_path = dl_path + PARTIAL_SUFFIX
offset = 0
length = kwargs.get('length', 0)
resume = kwargs.get('resume', True)
if resume and os.path.isfile(part_path):
with open(part_path, 'ab') as f:
trunc_pos = os.path.getsize(part_path) - 1 - FS_RW_CHUNK_SZ
f.truncate(trunc_pos if trunc_pos >= 0 else 0)
write_callbacks = kwargs.get('write_callbacks')
if write_callbacks:
with open(part_path, 'rb') as f:
for chunk in iter(lambda: f.read(FS_RW_CHUNK_SZ), b''):
for rcb in write_callbacks:
rcb(chunk)
f = open(part_path, 'ab')
else:
f = open(part_path, 'wb')
offset = f.tell()
self.chunked_download(node_id, f, offset=offset, **kwargs)
pos = f.tell()
f.close()
if length > 0 and pos < length:
raise RequestError(RequestError.CODE.INCOMPLETE_RESULT,
'[acd_api] download incomplete.')
if os.path.isfile(dl_path):
logger.info('Deleting existing file "%s".' % dl_path)
os.remove(dl_path)
os.rename(part_path, dl_path)
@catch_conn_exception
def chunked_download(self, node_id: str, file: io.BufferedWriter, **kwargs):
""":param kwargs:
offset (int): byte offset -- start byte for ranged request
length (int): total file length[!], equal to end + 1
write_callbacks (list[function])
"""
ok_codes = [http.PARTIAL_CONTENT]
write_callbacks = kwargs.get('write_callbacks', [])
chunk_start = kwargs.get('offset', 0)
length = kwargs.get('length', 100 * 1024 ** 4)
retries = 0
while chunk_start < length:
chunk_end = chunk_start + CHUNK_SIZE - 1
if chunk_end >= length:
chunk_end = length - 1
if retries >= CHUNK_MAX_RETRY:
raise RequestError(RequestError.CODE.FAILED_SUBREQUEST,
'[acd_api] Downloading chunk failed multiple times.')
r = self.BOReq.get(self.content_url + 'nodes/' + node_id + '/content', stream=True,
acc_codes=ok_codes,
headers={'Range': 'bytes=%d-%d' % (chunk_start, chunk_end)})
logger.debug('Range %d-%d' % (chunk_start, chunk_end))
# this should only happen at the end of unknown-length downloads
if r.status_code == http.REQUESTED_RANGE_NOT_SATISFIABLE:
logger.debug('Invalid byte range requested %d-%d' % (chunk_start, chunk_end))
break
if r.status_code not in ok_codes:
r.close()
retries += 1
logging.debug('Chunk [%d-%d], retry %d.' % (chunk_start, chunk_end, retries))
continue
curr_ln = 0
try:
for chunk in r.iter_content(chunk_size=FS_RW_CHUNK_SZ):
if chunk: # filter out keep-alive new chunks
file.write(chunk)
file.flush()
for wcb in write_callbacks:
wcb(chunk)
curr_ln += len(chunk)
finally:
r.close()
chunk_start += CHUNK_SIZE
retries = 0
return
[docs] def response_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> Response:
ok_codes = [http.PARTIAL_CONTENT]
end = offset + length - 1
logger.debug('chunk o %d l %d' % (offset, length))
r = self.BOReq.get(self.content_url + 'nodes/' + node_id + '/content',
acc_codes=ok_codes, stream=True,
headers={'Range': 'bytes=%d-%d' % (offset, end)}, **kwargs)
# if r.status_code == http.REQUESTED_RANGE_NOT_SATISFIABLE:
# return
if r.status_code not in ok_codes:
raise RequestError(r.status_code, r.text)
return r
[docs] def download_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> bytearray:
"""Load a file chunk into memory.
:param length: the length of the download chunk"""
r = self.response_chunk(node_id, offset, length, **kwargs)
if not r:
return
buffer = bytearray()
try:
for chunk in r.iter_content(chunk_size=FS_RW_CHUNK_SZ):
if chunk:
buffer.extend(chunk)
finally:
r.close()
return buffer
[docs] def download_thumbnail(self, node_id: str, file_name: str, max_dim=128):
"""Download a movie's or picture's thumbnail into a file.
Officially supports the image formats JPEG, BMP, PNG, TIFF, some RAW formats
and the video formats MP4, QuickTime, AVI, MTS, MPEG, ASF, WMV, FLV, OGG.
See http://www.amazon.com/gp/help/customer/display.html?nodeId=201634590
Additionally supports MKV.
:param max_dim: maximum width or height of the resized image/video thumbnail
"""
r = self.BOReq.get(self.content_url + 'nodes/' + node_id + '/content',
params={'viewBox': max_dim}, stream=True)
if r.status_code not in OK_CODES:
raise RequestError(r.status_code, r.text)
try:
with open(file_name, 'wb') as f:
f.write(r.raw.read())
finally:
r.close()