Source code for pyforms_web.controls.control_querylist

import locale, csv, itertools, simplejson, datetime

from decimal import Decimal
from django.db import models
from django.apps import apps
from django.db.models import Q
from django.utils import timezone
from django.utils.html import strip_tags
from django.http import HttpResponse
from django.db.models.fields.files import FieldFile
from pyforms_web.web.middleware import PyFormsMiddleware
from pyforms_web.controls.control_base import ControlBase
from pyforms_web.utils import get_lookup_verbose_name, get_lookup_value, get_lookup_field


def format_list_column(col_value, raw=False):
    if col_value is None:
        return ''

    if type(col_value).__name__ == "ManyRelatedManager":
        # format a ManyToManyField for LIST_DISPLAY
        # TODO should we limit when there are a lot of objects?
        # or should the user remove that column from LIST_DISPLAY?
        objects = [str(obj) for obj in col_value.all()]
        if raw:
            return ", ".join(objects)
        else:
            return "<br>".join(objects)

    if callable(col_value):
        col_value = col_value()

    if isinstance(col_value, datetime.datetime):
        if not col_value: return ''
        col_value = timezone.localtime(col_value)
        return col_value.strftime('%Y-%m-%d %H:%M')
    elif isinstance(col_value, datetime.date):
        if not col_value: return ''
        return col_value.strftime('%Y-%m-%d')
    elif isinstance(col_value, bool):
        if raw:
            return str(col_value)
        else:
            return '<i class="check circle green icon"></i>' if col_value else '<i class="minus circle red icon"></i>'
    elif isinstance(col_value, int):
        return locale.format("%d", col_value, grouping=True)
    elif isinstance(col_value, float):
        return locale.format("%f", col_value, grouping=True)
    elif isinstance(col_value, Decimal):
        return '{0:n}'.format(col_value)
    elif type(col_value).__name__ == 'Money':
        # support django-money MoneyField
        if raw:
            return str(col_value)
        else:
            return '<div style="text-align: right; margin-right: .5rem;">%s</div>' % col_value
    elif isinstance(col_value, FieldFile):
        if raw:
            return str(col_value.name)
        try:
            return '<a href="{0}" target="_blank" click="return false;" >{1}</a>'.format(col_value.url, col_value.name)
        except ValueError:
            return ''
    elif isinstance(col_value, models.Model):
        return col_value.__str__()
    elif callable(col_value):
        v = col_value()

        if hasattr(col_value, 'boolean') and getattr(col_value, 'boolean'):
            if not raw:
                v = '<i class="check circle green icon"></i>' if v else '<i class="minus circle red icon"></i>'

        return '' if v is None else str(v)
    else:
        return col_value

[docs]class ControlQueryList(ControlBase): def __init__(self, *args, **kwargs): self.rows_per_page = kwargs.get('rows_per_page', 10) self.n_pages = kwargs.get('n_pages', 5) self._current_page = 1 self.headers = kwargs.get('headers', None) self.list_display = kwargs.get('list_display', []) self.list_filter = kwargs.get('list_filter', []) self.search_fields = kwargs.get('search_fields', []) self.export_csv = kwargs.get('export_csv', False) self.export_csv_columns = kwargs.get('export_csv_columns', self.list_display) self.export_csv_headers = kwargs.get('export_csv_headers', {}) self._columns_size = kwargs.get('columns_size', None) self._columns_align = kwargs.get('columns_align', None) self.item_selection_changed_event = kwargs.get('item_selection_changed_event', self.item_selection_changed_event) self.filter_event = kwargs.get('filter_event', self.filter_event) self.page_event = kwargs.get('page_event', self.page_event) self.sort_event = kwargs.get('sort_event', self.sort_event) self.search_field_key = None self.filter_by = [] self.sort_by = [] self._selected_row_id = -1 #row selected by the mouse self.custom_filter_labels = {} # these informations is needed to serialize the control to the drive self._app = None self._model = None self._query = None self._update_list = True #used to update the list to the client #################################################################### super(ControlQueryList, self).__init__(*args, **kwargs) def init_form(self): return "new ControlQueryList('{0}', {1})".format( self._name, simplejson.dumps(self.serialize(init_form=True)) )
[docs] def item_selection_changed_client_event(self): self.mark_to_update_client() # what are the implications of enabling this??? self.item_selection_changed_event()
[docs] def item_selection_changed_event(self): pass
def __get_pages_2_show(self, queryset): if not queryset: return [] total_rows = queryset.count() total_n_pages = (total_rows / self.rows_per_page) + (0 if (total_rows % self.rows_per_page)==0 else 1) start_page = self._current_page - self.n_pages/2 end_page = self._current_page + self.n_pages/2 if start_page<1: diff = 1 - start_page start_page = 1 end_page += diff if end_page>total_n_pages: end_page = total_n_pages if ( end_page-(self.n_pages) )>=1: start_page = (end_page-(self.n_pages-1)) return [int(start_page-1) if int(start_page)>1 else -1] + list(range(int(start_page), int(end_page)+1)) + [ int(end_page+1) if int(end_page)<int(total_n_pages) else -1]
[docs] def export_csv_event(self): """ Event called to export the queryset to excel """ self.parent.execute_js( """window.open('/pyforms/export-csv/{0}/{1}/');""".format(self.parent.uid, self.name) )
[docs] def export_csv_http_response(self): response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = 'attachment; filename="{0}.csv"'.format(timezone.now().isoformat()) writer = csv.writer(response, delimiter=";") queryset = self.value headers = [] for column_name in self.export_csv_columns: try: header = self.export_csv_headers[column_name] except KeyError: header = get_lookup_verbose_name(queryset.model, column_name) headers.append(header) writer.writerow(headers) for o in queryset: row = [ strip_tags(format_list_column(get_lookup_value(o, col), raw=True)) for col in self.export_csv_columns ] writer.writerow(row) return response
@property def export_csv_columns(self): """ Sets and gets the list of columns to be used in the cvs export By default it will assume the self.list_display value """ return self._export_csv_columns @export_csv_columns.setter def export_csv_columns(self, value): self._export_csv_columns = value @property def export_csv(self): """ Flag to activate or deactivate the csv export button """ return self._export_csv @export_csv.setter def export_csv(self, value): self._export_csv = value @property def selected_row_id(self): return self._selected_row_id @selected_row_id.setter def selected_row_id(self, value): self._selected_row_id = value @property def columns_size(self): return self._columns_size @columns_size.setter def columns_size(self, value): self.mark_to_update_client() self._columns_size = value @property def columns_align(self): return self._columns_align @columns_align.setter def columns_align(self, value): self.mark_to_update_client() self._columns_align = value @property def value(self): if self._app and self._model and self._query: # reconstruct the query ################################ model = apps.get_model(self._app, self._model) qs = model.objects.all() qs.query = self._query # apply filters for f in self.filter_by: qs = qs.filter(**f) # apply search keys if self.search_field_key and len(self.search_field_key)>0: search_filter = None for s in self.search_fields: keys_filter = None for key in self.search_field_key.split(): q = Q(**{s: key}) keys_filter = (keys_filter & q) if keys_filter else q search_filter = (search_filter | keys_filter) if search_filter else keys_filter qs = qs.filter(search_filter) # apply orders by if len(self.sort_by)>0: for sort in self.sort_by: direction = '-' if sort['desc'] else '' qs = qs.order_by( direction+sort['column'] ) # if no order by exists add one, to avoid the values to be show randomly in the list order_by = list(qs.query.order_by) if 'pk' not in order_by or '-pk' not in order_by: order_by.append('-pk') qs = qs.order_by( *order_by ) return qs.distinct() else: return None @value.setter def value(self, value): if self._query!=value.query: if value is not None: if len(value.query.order_by)==0 and value.model._meta.ordering: value = value.order_by(*value.model._meta.ordering) self._model = value.model._meta.label.split('.')[-1] self._query = value.query self._app = value.model._meta.app_label self._selected_row_id = -1 self._current_page = 1 self.mark_to_update_client() self.changed_event() def serialize(self, init_form=False): data = ControlBase.serialize(self) queryset = self.value rows = [] if self._update_list and queryset: row_start = self.rows_per_page*(self._current_page-1) row_end = self.rows_per_page*(self._current_page) rows = self.queryset_to_list(queryset, self.list_display, row_start, row_end) if init_form: filters_list = self.serialize_filters(self.list_filter, queryset) data.update({ 'filters_list': filters_list }); if init_form and self.list_display and (queryset or self.headers): #configure the headers titles headers = [] if self.headers is None: for column_name in self.list_display: label = get_lookup_verbose_name(queryset.model, column_name) headers.append({ 'label': label, 'column': column_name }) else: for label, column_name in itertools.zip_longest(self.headers, self.list_display): headers.append({ 'label': label, 'column': column_name }) data.update({ 'horizontal_headers': headers, }); if len(self.search_fields)>0: data.update({'search_field_key': self.search_field_key if self.search_field_key is not None else ''}) total_rows = queryset.count() if queryset else 0 total_n_pages = (total_rows / self.rows_per_page) + (0 if (total_rows % self.rows_per_page)==0 else 1) data.update({ 'columns_align': self.columns_align, 'columns_size': self.columns_size, 'export_csv': self.export_csv, 'filter_by': self.filter_by, 'sort_by': self.sort_by, 'pages': {'current_page': self._current_page, 'pages_list':self.__get_pages_2_show(queryset) }, 'pages_total': total_n_pages, 'value': rows, 'values_total': total_rows, 'selected_row_id': self._selected_row_id }) return data
[docs] def page_changed_event(self): self.page_event() self._selected_row_id = -1 self.mark_to_update_client()
[docs] def sort_changed_event(self): self.sort_event() self._selected_row_id = -1 self.mark_to_update_client()
[docs] def filter_changed_event(self): self.filter_event() self._selected_row_id = -1 self._current_page = 1 self.mark_to_update_client()
[docs] def filter_event(self):pass
[docs] def page_event(self):pass
[docs] def sort_event(self):pass
##################################################################### #####################################################################
[docs] def format_filter_column(self, col_value): if isinstance(col_value, datetime.datetime): if not col_value: return '' col_value = timezone.localtime(col_value) return col_value.strftime('%Y-%m-%d %H:%M') elif isinstance(col_value, datetime.date): if not col_value: return '' return col_value.strftime('%Y-%m-%d') elif isinstance(col_value, bool): return '<i class="check circle green icon"></i>' if col_value else '<i class="minus circle red icon"></i>' elif isinstance(col_value, models.Model): return col_value.__str__() elif callable(col_value): return str(col_value()) else: return col_value
[docs] def queryset_to_list(self, queryset, list_display, first_row, last_row): if not list_display: return [ [m.pk, str(m)] for m in queryset[first_row:last_row] ] else: queryset = queryset.distinct() #queryset = queryset.order_by(*queryset.query.order_by) rows = [] for o in queryset[first_row:last_row]: row = [o.pk] + [format_list_column(get_lookup_value(o, col)) for col in list_display] rows.append(row) return rows
""" def get_datetimefield_options(self, column_name): column_filter = "{0}__gte".format(column_name) now = timezone.now() today_begin = now.replace(hour=0, minute=0, second=0, microsecond=0) today_end = now.replace(hour=23, minute=59, second=59, microsecond=999) next_4_months = today_end + timedelta(days=4*30) month_begin = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) month_end = now.replace(day=monthrange(now.year, now.month)[1], hour=23, minute=59, second=59, microsecond=999) return { 'items': [ ("{0}__gte={1}&{0}__lte={2}".format(column_name, today_begin.strftime('%Y-%m-%d'), today_end.strftime('%Y-%m-%d')), 'Today'), ("{0}__gte={1}&{0}__lte={2}".format(column_name, month_begin.strftime('%Y-%m-%d'), month_end.strftime('%Y-%m-%d')), 'This month'), ("{0}__gte={1}&{0}__lte={2}".format(column_name, today_begin.strftime('%Y-%m-%d'), next_4_months.strftime('%Y-%m-%d')), 'Next 4 months'), ("{0}__year={1}".format(column_name, now.year), 'This year') ] } """ def deserialize(self, properties): self._label = properties.get('label','') self._help = properties.get('help','') self._visible = properties.get('visible',True) self.search_field_key = properties.get('search_field_key', None) self.sort_by = properties.get('sort_by', []) self.filter_by = properties.get('filter_by',[]) self._current_page = int(properties['pages']['current_page']) self._selected_row_id = properties.get('selected_row_id', -1)
[docs] def serialize_filters(self, list_filter, queryset): filters_list = [] model = queryset.model #configure the filters for column_name in list_filter: order_by = column_name column_name = column_name[1:] if column_name.startswith('-') else column_name field = get_lookup_field(model, column_name) if field is None: continue field_properties = { 'field_type': 'combo', 'label': self.custom_filter_labels.get( column_name, get_lookup_verbose_name(model, column_name), ), 'column': column_name } if isinstance(field, models.BooleanField): field_properties.update({ 'items': [ ("{0}=true".format(column_name), 'Yes'), ("{0}=false".format(column_name), 'No')] }) elif isinstance(field, models.Field) and field.choices: field_properties.update({ 'items': [ ("{0}={1}".format(column_name, c[0]),c[1]) for c in field.choices] }) elif isinstance(field, (models.DateField, models.DateTimeField) ): #field_properties.update(self.get_datetimefield_options(column_name)) field_properties.update({ 'field_type': 'date-range' }) elif field.is_relation: objects = field.related_model.objects.all() # Apply the field limits choice ################################## limit_choices = field.get_limit_choices_to() if limit_choices: objects = objects.filter(**limit_choices).distinct() ################################################################## # Check if the parent window has a function to filter the related fields if hasattr(self.parent, 'get_related_field_queryset'): objects = self.parent.get_related_field_queryset( PyFormsMiddleware.get_request(), queryset, field, objects ) filter_values = [(column_name+'='+str(o.pk), o.__str__() ) for o in objects] field_properties.update({'items': filter_values}) else: column_values = queryset.values_list(column_name, flat=True).distinct().order_by(order_by) filter_values = [(column_name+'='+str(column_value), column_value) for column_value in column_values] field_properties.update({'items': filter_values}) filters_list.append(field_properties) return filters_list