storage_backend/storage_file/models/storage_file.py

239 lines
7.7 KiB
Python
Raw Permalink Normal View History

2024-11-08 03:21:56 +07:00
# Copyright 2017 Akretion (http://www.akretion.com).
# @author Sébastien BEAU <sebastien.beau@akretion.com>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import base64
import hashlib
import logging
import mimetypes
import os
import re
from odoo import api, fields, models
from odoo.exceptions import UserError
from odoo.tools import human_size
from odoo.tools.translate import _
_logger = logging.getLogger(__name__)
try:
from slugify import slugify
except ImportError: # pragma: no cover
_logger.debug("Cannot `import slugify`.")
REGEX_SLUGIFY = r"[^-a-z0-9_]+"
class StorageFile(models.Model):
_name = "storage.file"
_description = "Storage File"
name = fields.Char(required=True, index=True)
backend_id = fields.Many2one(
"storage.backend", "Storage", index=True, required=True
)
url = fields.Char(compute="_compute_url", help="HTTP accessible path to the file")
url_path = fields.Char(
compute="_compute_url_path", help="Accessible path, no base URL"
)
internal_url = fields.Char(
compute="_compute_internal_url",
help="HTTP URL to load the file directly from storage.",
)
slug = fields.Char(
compute="_compute_slug", help="Slug-ified name with ID for URL", store=True
)
relative_path = fields.Char(
readonly=True, help="Relative location for backend", copy=False
)
file_size = fields.Integer("File Size")
human_file_size = fields.Char(
"Human File Size", compute="_compute_human_file_size", store=True
)
checksum = fields.Char("Checksum/SHA1", size=40, index=True, readonly=True)
filename = fields.Char(
"Filename without extension", compute="_compute_extract_filename", store=True
)
extension = fields.Char(
"Extension", compute="_compute_extract_filename", store=True
)
mimetype = fields.Char("Mime Type", compute="_compute_extract_filename", store=True)
data = fields.Binary(
help="Data",
inverse="_inverse_data",
compute="_compute_data",
store=False,
copy=True,
)
to_delete = fields.Boolean()
active = fields.Boolean(default=True)
company_id = fields.Many2one(
"res.company", "Company", default=lambda self: self.env.user.company_id.id
)
file_type = fields.Selection([])
_sql_constraints = [
(
"path_uniq",
"unique(relative_path, backend_id)",
"The private path must be uniq per backend",
)
]
def write(self, vals):
if "data" in vals:
for record in self:
if record.data:
raise UserError(
_("File can not be updated," "remove it and create a new one")
)
return super(StorageFile, self).write(vals)
@api.depends("file_size")
def _compute_human_file_size(self):
for record in self:
record.human_file_size = human_size(record.file_size)
@api.depends("filename", "extension")
def _compute_slug(self):
for record in self:
record.slug = record._slugify_name_with_id()
def _slugify_name_with_id(self):
return "{}{}".format(
slugify(
"{}-{}".format(self.filename, self.id), regex_pattern=REGEX_SLUGIFY
),
self.extension,
)
def _build_relative_path(self, checksum):
self.ensure_one()
strategy = self.sudo().backend_id.filename_strategy
if not strategy:
raise UserError(
_(
"The filename strategy is empty for the backend %s.\n"
"Please configure it"
)
% self.backend_id.name
)
if strategy == "hash":
return checksum[:2] + "/" + checksum
elif strategy == "name_with_id":
return self.slug
def _prepare_meta_for_file(self):
bin_data = base64.b64decode(self.data)
checksum = hashlib.sha1(bin_data).hexdigest()
relative_path = self._build_relative_path(checksum)
return {
"checksum": checksum,
"file_size": len(bin_data),
"relative_path": relative_path,
}
def _inverse_data(self):
for record in self:
record.write(record._prepare_meta_for_file())
record.backend_id.sudo().add(
record.relative_path,
record.data,
mimetype=record.mimetype,
binary=False,
)
def _compute_data(self):
for rec in self:
if self._context.get("bin_size"):
rec.data = rec.file_size
elif rec.relative_path:
rec.data = rec.backend_id.sudo().get(rec.relative_path, binary=False)
else:
rec.data = None
@api.depends("relative_path", "backend_id")
def _compute_url(self):
for record in self:
record.url = record._get_url()
@api.depends("relative_path", "backend_id")
def _compute_url_path(self):
# Keep this separated from `url` to avoid multiple compute:
# you'll need either one or the other.
for record in self:
record.url_path = record._get_url(exclude_base_url=True)
def _get_url(self, exclude_base_url=False):
"""Retrieve file URL based on backend params.
:param exclude_base_url: skip base_url
"""
return self.backend_id._get_url_for_file(
self, exclude_base_url=exclude_base_url
)
@api.depends("slug")
def _compute_internal_url(self):
for record in self:
record.internal_url = record._get_internal_url()
def _get_internal_url(self):
"""Retrieve file URL to load file directly from the storage.
It is recommended to use this for Odoo backend internal usage
to not generate traffic on external services.
"""
return f"/storage.file/{self.slug}"
@api.depends("name")
def _compute_extract_filename(self):
for rec in self:
if rec.name:
rec.filename, rec.extension = os.path.splitext(rec.name)
mime, __ = mimetypes.guess_type(rec.name)
else:
rec.filename = rec.extension = mime = False
rec.mimetype = mime
def unlink(self):
if self._context.get("cleanning_storage_file"):
super(StorageFile, self).unlink()
else:
self.write({"to_delete": True, "active": False})
return True
@api.model
def _clean_storage_file(self):
# we must be sure that all the changes are into the DB since
# we by pass the ORM
self.flush()
self._cr.execute(
"""SELECT id
FROM storage_file
WHERE to_delete=True FOR UPDATE"""
)
ids = [x[0] for x in self._cr.fetchall()]
for st_file in self.browse(ids):
st_file.backend_id.sudo().delete(st_file.relative_path)
st_file.with_context(cleanning_storage_file=True).unlink()
# commit is required since the backend could be an external system
# therefore, if the record is deleted on the external system
# we must be sure that the record is also deleted into Odoo
st_file._cr.commit()
@api.model
def get_from_slug_name_with_id(self, slug_name_with_id):
"""
Return a browse record from a string generated by the method
_slugify_name_with_id
:param slug_name_with_id:
:return: a BrowseRecord (could be empty...)
"""
# id is the last group of digit after '-'
_id = re.findall(r"-([0-9]+)", slug_name_with_id)[-1:]
if _id:
_id = int(_id[0])
return self.browse(_id)