"""
Required models to handle and store generated reports.
"""
import base64
import pickle
from datetime import date
from decimal import Decimal, getcontext
from timeit import default_timer as timer
from django import forms
from django.conf import settings
from django.core.mail import send_mail
from django.db import models
from django.template import Context
from django.template.loader import get_template
from . import settings as app_settings
from .managers import SchedulerManager
from .utils import my_import, str_to_date
ONMYDESK_FILE_HANDLER = getattr(settings, 'ONMYDESK_FILE_HANDLER', None)
class ReportNotSavedException(Exception):
pass
[docs]def output_file_handler(filepath):
"""
Returns the output filepath (handled or not by an external function).
This function tries to find a function handler in `settings.ONMYDESK_FILE_HANDLER`. It
must receive a filepath and returns a new filepath (or url, e.g.) to be stored in the
report register. It's useful to handle the report results (move to other dirs ou to cloud).
:param str filepath: File path to output generated by report.
:returns: File path to output (processed or not by a external handler)
:rtype: str
"""
function_handler = app_settings.ONMYDESK_FILE_HANDLER
if not function_handler:
return filepath
handler = my_import(function_handler)
return handler(filepath)
[docs]class Report(models.Model):
"""Report model to store generated reports"""
STATUS_PENDING = 'pending'
STATUS_PROCESSING = 'processing'
STATUS_PROCESSED = 'processed'
STATUS_ERROR = 'error'
STATUS_CHOICES = (
(STATUS_PENDING, 'Pending'),
(STATUS_PROCESSING, 'Processing'),
(STATUS_PROCESSED, 'Processed'),
(STATUS_ERROR, 'Error'),
)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING)
process_time = models.DecimalField(verbose_name='Process time (secs)', max_digits=10,
decimal_places=4, null=True, blank=True)
params = models.BinaryField(verbose_name='Report params', null=True, blank=True)
report = models.CharField(max_length=255)
results = models.CharField(max_length=255, null=True, blank=True)
insert_date = models.DateTimeField('Creation Date', auto_now_add=True)
update_date = models.DateTimeField('Update Date', auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
def __str__(self):
if not self.report:
return 'Report object'
report_class = my_import(self.report)
return '{}{}'.format(
report_class.name,
' #{}'.format(self.id) if self.id else '')
[docs] def set_params(self, params):
"""Set params to be used when report is processed
:param dict params: Dictionary with params to be used to process report.
"""
self.params = base64.b64encode(pickle.dumps(params))
[docs] def get_params(self):
"""Params to be used to process report.
:return: Report params"""
if self.params:
return pickle.loads(base64.b64decode(self.params))
return None
[docs] def process(self):
"""Process this report. After processing the outputs will be stored at `results`.
To access output results is recommended to use :func:`results_as_list`.
"""
if not self.id:
raise ReportNotSavedException()
self.status = Report.STATUS_PROCESSING
self.save(update_fields=['status'])
report_class = my_import(self.report)
report = report_class(params=self.get_params())
try:
getcontext().prec = 5
start = Decimal(timer())
report.process()
self.process_time = Decimal(timer()) - start
results = []
for filepath in report.output_filepaths:
results.append(output_file_handler(filepath))
self.results = ';'.join(results)
self.status = Report.STATUS_PROCESSED
self.save(update_fields=['status'])
except Exception as e:
self.status = Report.STATUS_ERROR
self.save(update_fields=['status'])
raise e
@property
def result_links(self):
"""Returns a list with links to access report results.
:returns: List of links to access results
:rtype: list"""
link_handler = app_settings.ONMYDESK_DOWNLOAD_LINK_HANDLER
if not link_handler:
return '#'
handler = my_import(link_handler)
return [handler(i) for i in self.results_as_list]
@property
def results_as_list(self):
"""Returns a list of output results stored in this model
:returns: List of results
:rtype: list"""
if not self.results:
return []
return self.results.split(';')
[docs]class Scheduler(models.Model):
"""Model used to schedule reports to be generated with some
periodicity (every monday, from monday to friday, etc.)"""
objects = SchedulerManager()
PER_MON_FRI = 'mon_fri'
PER_MON_SUN = 'mon_sun'
PER_SUN = 'sun'
PER_MON = 'mon'
PER_TUE = 'tue'
PER_WED = 'wed'
PER_THU = 'thu'
PER_FRI = 'fri'
PER_SAT = 'sat'
PERIODICITIES = (
(PER_MON_FRI, 'Monday to Friday'),
(PER_MON_SUN, 'Monday to Sunday'),
(PER_SUN, 'Every Sunday'),
(PER_MON, 'Every Monday'),
(PER_TUE, 'Every Tuesday'),
(PER_WED, 'Every Wednesday'),
(PER_THU, 'Every Thursday'),
(PER_FRI, 'Every Friday'),
(PER_SAT, 'Every Saturday'),
)
PERIODICITIES_BY_WEEKDAY = {
# Monday is 0 and Sunday is 6
0: {PER_MON_SUN, PER_MON_FRI, PER_MON},
1: {PER_MON_SUN, PER_MON_FRI, PER_TUE},
2: {PER_MON_SUN, PER_MON_FRI, PER_WED},
3: {PER_MON_SUN, PER_MON_FRI, PER_THU},
4: {PER_MON_SUN, PER_MON_FRI, PER_FRI},
5: {PER_MON_SUN, PER_SAT},
6: {PER_MON_SUN, PER_SUN},
}
report = models.CharField(max_length=255)
periodicity = models.CharField(max_length=20, choices=PERIODICITIES)
params = models.BinaryField(verbose_name='Parameters', null=True, blank=True)
notify_emails = models.CharField('E-mail\'s to notify after process (separated by ",")',
max_length=1000, null=True, blank=True)
insert_date = models.DateTimeField('Creation Date', auto_now_add=True)
update_date = models.DateTimeField('Update Date', auto_now=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
def __str__(self):
if not self.report:
return 'Scheduler object'
report_class = my_import(self.report)
return '{}{}'.format(
report_class.name,
' #{}'.format(self.id) if self.id else '')
[docs] def set_params(self, params):
"""Set params to be used when report is processed
:param dict params: Dictionary with params to be used to process report.
"""
self.params = base64.b64encode(pickle.dumps(params))
[docs] def get_params(self):
"""Params to be used to process report.
:return: Report params"""
if self.params:
return pickle.loads(base64.b64decode(self.params))
return None
[docs] def process(self, reference_date=None):
"""Process scheduler creating and returing a report.
After processing, this method tries to notify e-mails filled in notify_emails field.
:returns: Report result
:rtype: Report"""
report = Report(report=self.report,
# Avoid other routines to get this report to process
status=Report.STATUS_PROCESSING,
created_by=self.created_by)
report.set_params(self.get_processed_params(reference_date))
report.save()
report.process()
report.save()
self._notify(report)
return report
[docs] def get_processed_params(self, reference_date=None):
"""Params to be used to process report
:param date reference_date: Date to use as reference
:returns: Dict with params
:rtype: dict
"""
reference_date = reference_date or date.today()
report_class = my_import(self.report)
params = self.get_params()
if not params:
return None
form = report_class.get_form()
if not form:
return params
for name, field in form.base_fields.items():
if name in params and isinstance(field, forms.fields.DateField):
params[name] = str_to_date(params[name], reference_date)
return params
# TODO: Move this to a signal handler ('scheduler processed' or something like that).
def _notify(self, report):
text_template = get_template('onmydesk/scheduler-notify.txt')
html_template = get_template('onmydesk/scheduler-notify.html')
destinations = self.notify_emails.split(',') if self.notify_emails else []
if not destinations:
return
context = dict(
scheduler=self,
report=report,
)
text_content = text_template.render(Context(context))
html_content = html_template.render(Context(context))
send_mail(
app_settings.ONMYDESK_SCHEDULER_NOTIFY_SUBJECT.format(
report_name=str(report)),
text_content,
app_settings.ONMYDESK_NOTIFY_FROM,
self.notify_emails.split(','),
html_message=html_content)