Source code for onmydesk.models

"""
Required models to handle and store generated reports.
"""
import base64
import pickle
from datetime import date
from decimal import Decimal, getcontext
from timeit import default_timer as timer

from django import forms
from django.conf import settings
from django.core.mail import send_mail
from django.db import models
from django.template import Context
from django.template.loader import get_template

from . import settings as app_settings
from .managers import SchedulerManager
from .utils import my_import, str_to_date


ONMYDESK_FILE_HANDLER = getattr(settings, 'ONMYDESK_FILE_HANDLER', None)


class ReportNotSavedException(Exception):
    pass


[docs]def output_file_handler(filepath): """ Returns the output filepath (handled or not by an external function). This function tries to find a function handler in `settings.ONMYDESK_FILE_HANDLER`. It must receive a filepath and returns a new filepath (or url, e.g.) to be stored in the report register. It's useful to handle the report results (move to other dirs ou to cloud). :param str filepath: File path to output generated by report. :returns: File path to output (processed or not by a external handler) :rtype: str """ function_handler = app_settings.ONMYDESK_FILE_HANDLER if not function_handler: return filepath handler = my_import(function_handler) return handler(filepath)
[docs]class Report(models.Model): """Report model to store generated reports""" STATUS_PENDING = 'pending' STATUS_PROCESSING = 'processing' STATUS_PROCESSED = 'processed' STATUS_ERROR = 'error' STATUS_CHOICES = ( (STATUS_PENDING, 'Pending'), (STATUS_PROCESSING, 'Processing'), (STATUS_PROCESSED, 'Processed'), (STATUS_ERROR, 'Error'), ) status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING) process_time = models.DecimalField(verbose_name='Process time (secs)', max_digits=10, decimal_places=4, null=True, blank=True) params = models.BinaryField(verbose_name='Report params', null=True, blank=True) report = models.CharField(max_length=255) results = models.CharField(max_length=255, null=True, blank=True) insert_date = models.DateTimeField('Creation Date', auto_now_add=True) update_date = models.DateTimeField('Update Date', auto_now=True) created_by = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True) def __str__(self): if not self.report: return 'Report object' report_class = my_import(self.report) return '{}{}'.format( report_class.name, ' #{}'.format(self.id) if self.id else '')
[docs] def set_params(self, params): """Set params to be used when report is processed :param dict params: Dictionary with params to be used to process report. """ self.params = base64.b64encode(pickle.dumps(params))
[docs] def get_params(self): """Params to be used to process report. :return: Report params""" if self.params: return pickle.loads(base64.b64decode(self.params)) return None
[docs] def process(self): """Process this report. After processing the outputs will be stored at `results`. To access output results is recommended to use :func:`results_as_list`. """ if not self.id: raise ReportNotSavedException() self.status = Report.STATUS_PROCESSING self.save(update_fields=['status']) report_class = my_import(self.report) report = report_class(params=self.get_params()) try: getcontext().prec = 5 start = Decimal(timer()) report.process() self.process_time = Decimal(timer()) - start results = [] for filepath in report.output_filepaths: results.append(output_file_handler(filepath)) self.results = ';'.join(results) self.status = Report.STATUS_PROCESSED self.save(update_fields=['status']) except Exception as e: self.status = Report.STATUS_ERROR self.save(update_fields=['status']) raise e
@property def result_links(self): """Returns a list with links to access report results. :returns: List of links to access results :rtype: list""" link_handler = app_settings.ONMYDESK_DOWNLOAD_LINK_HANDLER if not link_handler: return '#' handler = my_import(link_handler) return [handler(i) for i in self.results_as_list] @property def results_as_list(self): """Returns a list of output results stored in this model :returns: List of results :rtype: list""" if not self.results: return [] return self.results.split(';')
[docs]class Scheduler(models.Model): """Model used to schedule reports to be generated with some periodicity (every monday, from monday to friday, etc.)""" objects = SchedulerManager() PER_MON_FRI = 'mon_fri' PER_MON_SUN = 'mon_sun' PER_SUN = 'sun' PER_MON = 'mon' PER_TUE = 'tue' PER_WED = 'wed' PER_THU = 'thu' PER_FRI = 'fri' PER_SAT = 'sat' PERIODICITIES = ( (PER_MON_FRI, 'Monday to Friday'), (PER_MON_SUN, 'Monday to Sunday'), (PER_SUN, 'Every Sunday'), (PER_MON, 'Every Monday'), (PER_TUE, 'Every Tuesday'), (PER_WED, 'Every Wednesday'), (PER_THU, 'Every Thursday'), (PER_FRI, 'Every Friday'), (PER_SAT, 'Every Saturday'), ) PERIODICITIES_BY_WEEKDAY = { # Monday is 0 and Sunday is 6 0: {PER_MON_SUN, PER_MON_FRI, PER_MON}, 1: {PER_MON_SUN, PER_MON_FRI, PER_TUE}, 2: {PER_MON_SUN, PER_MON_FRI, PER_WED}, 3: {PER_MON_SUN, PER_MON_FRI, PER_THU}, 4: {PER_MON_SUN, PER_MON_FRI, PER_FRI}, 5: {PER_MON_SUN, PER_SAT}, 6: {PER_MON_SUN, PER_SUN}, } report = models.CharField(max_length=255) periodicity = models.CharField(max_length=20, choices=PERIODICITIES) params = models.BinaryField(verbose_name='Parameters', null=True, blank=True) notify_emails = models.CharField('E-mail\'s to notify after process (separated by ",")', max_length=1000, null=True, blank=True) insert_date = models.DateTimeField('Creation Date', auto_now_add=True) update_date = models.DateTimeField('Update Date', auto_now=True) created_by = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True) def __str__(self): if not self.report: return 'Scheduler object' report_class = my_import(self.report) return '{}{}'.format( report_class.name, ' #{}'.format(self.id) if self.id else '')
[docs] def set_params(self, params): """Set params to be used when report is processed :param dict params: Dictionary with params to be used to process report. """ self.params = base64.b64encode(pickle.dumps(params))
[docs] def get_params(self): """Params to be used to process report. :return: Report params""" if self.params: return pickle.loads(base64.b64decode(self.params)) return None
[docs] def process(self, reference_date=None): """Process scheduler creating and returing a report. After processing, this method tries to notify e-mails filled in notify_emails field. :returns: Report result :rtype: Report""" report = Report(report=self.report, # Avoid other routines to get this report to process status=Report.STATUS_PROCESSING, created_by=self.created_by) report.set_params(self.get_processed_params(reference_date)) report.save() report.process() report.save() self._notify(report) return report
[docs] def get_processed_params(self, reference_date=None): """Params to be used to process report :param date reference_date: Date to use as reference :returns: Dict with params :rtype: dict """ reference_date = reference_date or date.today() report_class = my_import(self.report) params = self.get_params() if not params: return None form = report_class.get_form() if not form: return params for name, field in form.base_fields.items(): if name in params and isinstance(field, forms.fields.DateField): params[name] = str_to_date(params[name], reference_date) return params
# TODO: Move this to a signal handler ('scheduler processed' or something like that). def _notify(self, report): text_template = get_template('onmydesk/scheduler-notify.txt') html_template = get_template('onmydesk/scheduler-notify.html') destinations = self.notify_emails.split(',') if self.notify_emails else [] if not destinations: return context = dict( scheduler=self, report=report, ) text_content = text_template.render(Context(context)) html_content = html_template.render(Context(context)) send_mail( app_settings.ONMYDESK_SCHEDULER_NOTIFY_SUBJECT.format( report_name=str(report)), text_content, app_settings.ONMYDESK_NOTIFY_FROM, self.notify_emails.split(','), html_message=html_content)