239 lines
7.7 KiB
Python
239 lines
7.7 KiB
Python
# Copyright 2017 Akretion (http://www.akretion.com).
|
|
# @author Sébastien BEAU <sebastien.beau@akretion.com>
|
|
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
|
|
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
|
|
from odoo import api, fields, models
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools import human_size
|
|
from odoo.tools.translate import _
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from slugify import slugify
|
|
except ImportError: # pragma: no cover
|
|
_logger.debug("Cannot `import slugify`.")
|
|
|
|
|
|
REGEX_SLUGIFY = r"[^-a-z0-9_]+"
|
|
|
|
|
|
class StorageFile(models.Model):
|
|
_name = "storage.file"
|
|
_description = "Storage File"
|
|
|
|
name = fields.Char(required=True, index=True)
|
|
backend_id = fields.Many2one(
|
|
"storage.backend", "Storage", index=True, required=True
|
|
)
|
|
url = fields.Char(compute="_compute_url", help="HTTP accessible path to the file")
|
|
url_path = fields.Char(
|
|
compute="_compute_url_path", help="Accessible path, no base URL"
|
|
)
|
|
internal_url = fields.Char(
|
|
compute="_compute_internal_url",
|
|
help="HTTP URL to load the file directly from storage.",
|
|
)
|
|
slug = fields.Char(
|
|
compute="_compute_slug", help="Slug-ified name with ID for URL", store=True
|
|
)
|
|
relative_path = fields.Char(
|
|
readonly=True, help="Relative location for backend", copy=False
|
|
)
|
|
file_size = fields.Integer("File Size")
|
|
human_file_size = fields.Char(
|
|
"Human File Size", compute="_compute_human_file_size", store=True
|
|
)
|
|
checksum = fields.Char("Checksum/SHA1", size=40, index=True, readonly=True)
|
|
filename = fields.Char(
|
|
"Filename without extension", compute="_compute_extract_filename", store=True
|
|
)
|
|
extension = fields.Char(
|
|
"Extension", compute="_compute_extract_filename", store=True
|
|
)
|
|
mimetype = fields.Char("Mime Type", compute="_compute_extract_filename", store=True)
|
|
data = fields.Binary(
|
|
help="Data",
|
|
inverse="_inverse_data",
|
|
compute="_compute_data",
|
|
store=False,
|
|
copy=True,
|
|
)
|
|
to_delete = fields.Boolean()
|
|
active = fields.Boolean(default=True)
|
|
company_id = fields.Many2one(
|
|
"res.company", "Company", default=lambda self: self.env.user.company_id.id
|
|
)
|
|
file_type = fields.Selection([])
|
|
|
|
_sql_constraints = [
|
|
(
|
|
"path_uniq",
|
|
"unique(relative_path, backend_id)",
|
|
"The private path must be uniq per backend",
|
|
)
|
|
]
|
|
|
|
def write(self, vals):
|
|
if "data" in vals:
|
|
for record in self:
|
|
if record.data:
|
|
raise UserError(
|
|
_("File can not be updated," "remove it and create a new one")
|
|
)
|
|
return super(StorageFile, self).write(vals)
|
|
|
|
@api.depends("file_size")
|
|
def _compute_human_file_size(self):
|
|
for record in self:
|
|
record.human_file_size = human_size(record.file_size)
|
|
|
|
@api.depends("filename", "extension")
|
|
def _compute_slug(self):
|
|
for record in self:
|
|
record.slug = record._slugify_name_with_id()
|
|
|
|
def _slugify_name_with_id(self):
|
|
return "{}{}".format(
|
|
slugify(
|
|
"{}-{}".format(self.filename, self.id), regex_pattern=REGEX_SLUGIFY
|
|
),
|
|
self.extension,
|
|
)
|
|
|
|
def _build_relative_path(self, checksum):
|
|
self.ensure_one()
|
|
strategy = self.sudo().backend_id.filename_strategy
|
|
if not strategy:
|
|
raise UserError(
|
|
_(
|
|
"The filename strategy is empty for the backend %s.\n"
|
|
"Please configure it"
|
|
)
|
|
% self.backend_id.name
|
|
)
|
|
if strategy == "hash":
|
|
return checksum[:2] + "/" + checksum
|
|
elif strategy == "name_with_id":
|
|
return self.slug
|
|
|
|
def _prepare_meta_for_file(self):
|
|
bin_data = base64.b64decode(self.data)
|
|
checksum = hashlib.sha1(bin_data).hexdigest()
|
|
relative_path = self._build_relative_path(checksum)
|
|
return {
|
|
"checksum": checksum,
|
|
"file_size": len(bin_data),
|
|
"relative_path": relative_path,
|
|
}
|
|
|
|
def _inverse_data(self):
|
|
for record in self:
|
|
record.write(record._prepare_meta_for_file())
|
|
record.backend_id.sudo().add(
|
|
record.relative_path,
|
|
record.data,
|
|
mimetype=record.mimetype,
|
|
binary=False,
|
|
)
|
|
|
|
def _compute_data(self):
|
|
for rec in self:
|
|
if self._context.get("bin_size"):
|
|
rec.data = rec.file_size
|
|
elif rec.relative_path:
|
|
rec.data = rec.backend_id.sudo().get(rec.relative_path, binary=False)
|
|
else:
|
|
rec.data = None
|
|
|
|
@api.depends("relative_path", "backend_id")
|
|
def _compute_url(self):
|
|
for record in self:
|
|
record.url = record._get_url()
|
|
|
|
@api.depends("relative_path", "backend_id")
|
|
def _compute_url_path(self):
|
|
# Keep this separated from `url` to avoid multiple compute:
|
|
# you'll need either one or the other.
|
|
for record in self:
|
|
record.url_path = record._get_url(exclude_base_url=True)
|
|
|
|
def _get_url(self, exclude_base_url=False):
|
|
"""Retrieve file URL based on backend params.
|
|
|
|
:param exclude_base_url: skip base_url
|
|
"""
|
|
return self.backend_id._get_url_for_file(
|
|
self, exclude_base_url=exclude_base_url
|
|
)
|
|
|
|
@api.depends("slug")
|
|
def _compute_internal_url(self):
|
|
for record in self:
|
|
record.internal_url = record._get_internal_url()
|
|
|
|
def _get_internal_url(self):
|
|
"""Retrieve file URL to load file directly from the storage.
|
|
|
|
It is recommended to use this for Odoo backend internal usage
|
|
to not generate traffic on external services.
|
|
"""
|
|
return f"/storage.file/{self.slug}"
|
|
|
|
@api.depends("name")
|
|
def _compute_extract_filename(self):
|
|
for rec in self:
|
|
if rec.name:
|
|
rec.filename, rec.extension = os.path.splitext(rec.name)
|
|
mime, __ = mimetypes.guess_type(rec.name)
|
|
else:
|
|
rec.filename = rec.extension = mime = False
|
|
rec.mimetype = mime
|
|
|
|
def unlink(self):
|
|
if self._context.get("cleanning_storage_file"):
|
|
super(StorageFile, self).unlink()
|
|
else:
|
|
self.write({"to_delete": True, "active": False})
|
|
return True
|
|
|
|
@api.model
|
|
def _clean_storage_file(self):
|
|
# we must be sure that all the changes are into the DB since
|
|
# we by pass the ORM
|
|
self.flush()
|
|
self._cr.execute(
|
|
"""SELECT id
|
|
FROM storage_file
|
|
WHERE to_delete=True FOR UPDATE"""
|
|
)
|
|
ids = [x[0] for x in self._cr.fetchall()]
|
|
for st_file in self.browse(ids):
|
|
st_file.backend_id.sudo().delete(st_file.relative_path)
|
|
st_file.with_context(cleanning_storage_file=True).unlink()
|
|
# commit is required since the backend could be an external system
|
|
# therefore, if the record is deleted on the external system
|
|
# we must be sure that the record is also deleted into Odoo
|
|
st_file._cr.commit()
|
|
|
|
@api.model
|
|
def get_from_slug_name_with_id(self, slug_name_with_id):
|
|
"""
|
|
Return a browse record from a string generated by the method
|
|
_slugify_name_with_id
|
|
:param slug_name_with_id:
|
|
:return: a BrowseRecord (could be empty...)
|
|
"""
|
|
# id is the last group of digit after '-'
|
|
_id = re.findall(r"-([0-9]+)", slug_name_with_id)[-1:]
|
|
if _id:
|
|
_id = int(_id[0])
|
|
return self.browse(_id)
|