# Copyright 2017 Akretion (http://www.akretion.com). # @author Sébastien BEAU # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl). import base64 import hashlib import logging import mimetypes import os import re from odoo import api, fields, models from odoo.exceptions import UserError from odoo.tools import human_size from odoo.tools.translate import _ _logger = logging.getLogger(__name__) try: from slugify import slugify except ImportError: # pragma: no cover _logger.debug("Cannot `import slugify`.") REGEX_SLUGIFY = r"[^-a-z0-9_]+" class StorageFile(models.Model): _name = "storage.file" _description = "Storage File" name = fields.Char(required=True, index=True) backend_id = fields.Many2one( "storage.backend", "Storage", index=True, required=True ) url = fields.Char(compute="_compute_url", help="HTTP accessible path to the file") url_path = fields.Char( compute="_compute_url_path", help="Accessible path, no base URL" ) internal_url = fields.Char( compute="_compute_internal_url", help="HTTP URL to load the file directly from storage.", ) slug = fields.Char( compute="_compute_slug", help="Slug-ified name with ID for URL", store=True ) relative_path = fields.Char( readonly=True, help="Relative location for backend", copy=False ) file_size = fields.Integer("File Size") human_file_size = fields.Char( "Human File Size", compute="_compute_human_file_size", store=True ) checksum = fields.Char("Checksum/SHA1", size=40, index=True, readonly=True) filename = fields.Char( "Filename without extension", compute="_compute_extract_filename", store=True ) extension = fields.Char( "Extension", compute="_compute_extract_filename", store=True ) mimetype = fields.Char("Mime Type", compute="_compute_extract_filename", store=True) data = fields.Binary( help="Data", inverse="_inverse_data", compute="_compute_data", store=False, copy=True, ) to_delete = fields.Boolean() active = fields.Boolean(default=True) company_id = fields.Many2one( "res.company", "Company", default=lambda self: self.env.user.company_id.id ) file_type = fields.Selection([]) _sql_constraints = [ ( "path_uniq", "unique(relative_path, backend_id)", "The private path must be uniq per backend", ) ] def write(self, vals): if "data" in vals: for record in self: if record.data: raise UserError( _("File can not be updated," "remove it and create a new one") ) return super(StorageFile, self).write(vals) @api.depends("file_size") def _compute_human_file_size(self): for record in self: record.human_file_size = human_size(record.file_size) @api.depends("filename", "extension") def _compute_slug(self): for record in self: record.slug = record._slugify_name_with_id() def _slugify_name_with_id(self): return "{}{}".format( slugify( "{}-{}".format(self.filename, self.id), regex_pattern=REGEX_SLUGIFY ), self.extension, ) def _build_relative_path(self, checksum): self.ensure_one() strategy = self.sudo().backend_id.filename_strategy if not strategy: raise UserError( _( "The filename strategy is empty for the backend %s.\n" "Please configure it" ) % self.backend_id.name ) if strategy == "hash": return checksum[:2] + "/" + checksum elif strategy == "name_with_id": return self.slug def _prepare_meta_for_file(self): bin_data = base64.b64decode(self.data) checksum = hashlib.sha1(bin_data).hexdigest() relative_path = self._build_relative_path(checksum) return { "checksum": checksum, "file_size": len(bin_data), "relative_path": relative_path, } def _inverse_data(self): for record in self: record.write(record._prepare_meta_for_file()) record.backend_id.sudo().add( record.relative_path, record.data, mimetype=record.mimetype, binary=False, ) def _compute_data(self): for rec in self: if self._context.get("bin_size"): rec.data = rec.file_size elif rec.relative_path: rec.data = rec.backend_id.sudo().get(rec.relative_path, binary=False) else: rec.data = None @api.depends("relative_path", "backend_id") def _compute_url(self): for record in self: record.url = record._get_url() @api.depends("relative_path", "backend_id") def _compute_url_path(self): # Keep this separated from `url` to avoid multiple compute: # you'll need either one or the other. for record in self: record.url_path = record._get_url(exclude_base_url=True) def _get_url(self, exclude_base_url=False): """Retrieve file URL based on backend params. :param exclude_base_url: skip base_url """ return self.backend_id._get_url_for_file( self, exclude_base_url=exclude_base_url ) @api.depends("slug") def _compute_internal_url(self): for record in self: record.internal_url = record._get_internal_url() def _get_internal_url(self): """Retrieve file URL to load file directly from the storage. It is recommended to use this for Odoo backend internal usage to not generate traffic on external services. """ return f"/storage.file/{self.slug}" @api.depends("name") def _compute_extract_filename(self): for rec in self: if rec.name: rec.filename, rec.extension = os.path.splitext(rec.name) mime, __ = mimetypes.guess_type(rec.name) else: rec.filename = rec.extension = mime = False rec.mimetype = mime def unlink(self): if self._context.get("cleanning_storage_file"): super(StorageFile, self).unlink() else: self.write({"to_delete": True, "active": False}) return True @api.model def _clean_storage_file(self): # we must be sure that all the changes are into the DB since # we by pass the ORM self.flush() self._cr.execute( """SELECT id FROM storage_file WHERE to_delete=True FOR UPDATE""" ) ids = [x[0] for x in self._cr.fetchall()] for st_file in self.browse(ids): st_file.backend_id.sudo().delete(st_file.relative_path) st_file.with_context(cleanning_storage_file=True).unlink() # commit is required since the backend could be an external system # therefore, if the record is deleted on the external system # we must be sure that the record is also deleted into Odoo st_file._cr.commit() @api.model def get_from_slug_name_with_id(self, slug_name_with_id): """ Return a browse record from a string generated by the method _slugify_name_with_id :param slug_name_with_id: :return: a BrowseRecord (could be empty...) """ # id is the last group of digit after '-' _id = re.findall(r"-([0-9]+)", slug_name_with_id)[-1:] if _id: _id = int(_id[0]) return self.browse(_id)