From be7a1ccdd9912e340b50f32a3e0f15c867ce519d Mon Sep 17 00:00:00 2001 From: ashwin1111 Date: Wed, 21 May 2025 22:42:21 +0530 Subject: [PATCH 1/3] feat: Real-time exports, phase 2 - Intacct --- apps/fyle/helpers.py | 23 +++- apps/fyle/signals.py | 29 +++- apps/fyle/tasks.py | 73 +++++----- apps/fyle/views.py | 4 +- apps/internal/tasks.py | 2 +- apps/sage_intacct/queue.py | 8 +- apps/sage_intacct/views.py | 3 +- apps/workspaces/actions.py | 4 +- .../apis/advanced_settings/serializers.py | 3 +- .../apis/advanced_settings/triggers.py | 3 +- ...aceschedule_is_real_time_export_enabled.py | 18 +++ apps/workspaces/models.py | 1 + apps/workspaces/tasks.py | 43 +++--- apps/workspaces/urls.py | 2 - apps/workspaces/views.py | 89 ++++++------ requirements.txt | 2 +- .../reset_db_fixtures/reset_db.sql | 16 ++- tests/test_fyle/fixtures.py | 5 +- tests/test_fyle/test_signals.py | 127 ++++++++++++++++++ tests/test_workspaces/fixtures.py | 3 +- .../test_advanced_settings/fixtures.py | 2 + tests/test_workspaces/test_tasks.py | 9 +- tests/test_workspaces/test_views.py | 75 +++++------ workers/export/worker.py | 7 - 24 files changed, 361 insertions(+), 190 deletions(-) create mode 100644 apps/workspaces/migrations/0045_workspaceschedule_is_real_time_export_enabled.py create mode 100644 tests/test_fyle/test_signals.py diff --git a/apps/fyle/helpers.py b/apps/fyle/helpers.py index f5252c5e..e01d0494 100644 --- a/apps/fyle/helpers.py +++ b/apps/fyle/helpers.py @@ -487,17 +487,21 @@ def get_fund_source(workspace_id: int) -> list[str]: return fund_source -def handle_import_exception(task_log: TaskLog) -> None: +def handle_import_exception(task_log: TaskLog | None) -> None: """ Handle import exception :param task_log: task log :return: None """ error = traceback.format_exc() - task_log.detail = {'error': error} - task_log.status = 'FATAL' - task_log.save() - logger.error('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail) + if task_log: + task_log.detail = {'error': error} + task_log.status = 'FATAL' + task_log.updated_at = datetime.now() + task_log.save(update_fields=['detail', 'status', 'updated_at']) + logger.error('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail) + else: + logger.error('Something unexpected happened %s', error) def assert_valid_request(workspace_id:int, fyle_org_id:str) -> None: @@ -633,3 +637,12 @@ class Meta: model = Expense fields = ['org_id', 'is_skipped', 'updated_at__gte', 'updated_at__lte'] or_fields = ['expense_number', 'employee_name', 'employee_email', 'claim_number'] + + +def update_task_log_post_import(task_log: TaskLog, status: str, message: str = None, error: str = None) -> None: + """Helper function to update task log status and details""" + if task_log: + task_log.status = status + task_log.detail = {"message": message} if message else {"error": error} + task_log.updated_at = datetime.now() + task_log.save(update_fields=['status', 'detail', 'updated_at']) diff --git a/apps/fyle/signals.py b/apps/fyle/signals.py index 0c68fa78..870e95bd 100644 --- a/apps/fyle/signals.py +++ b/apps/fyle/signals.py @@ -4,11 +4,16 @@ from django.dispatch import receiver from django.core.exceptions import ValidationError +from django_q.tasks import async_task + +from fyle_accounting_library.fyle_platform.enums import FundSourceEnum, ExpenseImportSourceEnum, ExpenseStateEnum + from apps.fyle.tasks import re_run_skip_export_rule from apps.sage_intacct.dependent_fields import create_dependent_custom_field_in_fyle from apps.fyle.helpers import connect_to_platform -from apps.fyle.models import DependentFieldSetting, ExpenseFilter +from apps.fyle.models import DependentFieldSetting, ExpenseFilter, ExpenseGroupSettings +from apps.workspaces.models import Configuration logger = logging.getLogger(__name__) logger.level = logging.INFO @@ -70,3 +75,25 @@ def run_post_save_expense_filters(sender: type[ExpenseFilter], instance: Expense except Exception as e: logger.error(f'Error while processing expense filter for workspace: {instance.workspace.id} - {str(e)}') raise ValidationError('Failed to process expense filter') + + +@receiver(pre_save, sender=ExpenseGroupSettings) +def run_pre_save_expense_group_setting_triggers(sender: type[ExpenseGroupSettings], instance: ExpenseGroupSettings, **kwargs) -> None: + """ + Run pre save expense group setting triggers + """ + existing_expense_group_setting = ExpenseGroupSettings.objects.filter( + workspace_id=instance.workspace_id + ).first() + + if existing_expense_group_setting: + configuration = Configuration.objects.filter(workspace_id=instance.workspace_id).first() + if configuration: + # TODO: move these async_tasks to maintenance worker later + if configuration.reimbursable_expenses_object and existing_expense_group_setting.expense_state != instance.expense_state and existing_expense_group_setting.expense_state == ExpenseStateEnum.PAID and instance.expense_state == ExpenseStateEnum.PAYMENT_PROCESSING: + logger.info(f'Reimbursable expense state changed from {existing_expense_group_setting.expense_state} to {instance.expense_state} for workspace {instance.workspace_id}, so pulling the data from Fyle') + async_task('apps.fyle.tasks.create_expense_groups', workspace_id=instance.workspace_id, fund_source=[FundSourceEnum.PERSONAL], task_log=None, imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE) + + if configuration.corporate_credit_card_expenses_object and existing_expense_group_setting.ccc_expense_state != instance.ccc_expense_state and existing_expense_group_setting.ccc_expense_state == ExpenseStateEnum.PAID and instance.ccc_expense_state == ExpenseStateEnum.APPROVED: + logger.info(f'Corporate credit card expense state changed from {existing_expense_group_setting.ccc_expense_state} to {instance.ccc_expense_state} for workspace {instance.workspace_id}, so pulling the data from Fyle') + async_task('apps.fyle.tasks.create_expense_groups', workspace_id=instance.workspace_id, fund_source=[FundSourceEnum.CCC], task_log=None, imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE) diff --git a/apps/fyle/tasks.py b/apps/fyle/tasks.py index 280d1ff0..e8c65375 100644 --- a/apps/fyle/tasks.py +++ b/apps/fyle/tasks.py @@ -13,6 +13,7 @@ InternalServerError, InvalidTokenError ) +from fyle_accounting_library.fyle_platform.branding import feature_configuration from fyle_accounting_library.fyle_platform.helpers import get_expense_import_states, filter_expenses_based_on_state from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum @@ -22,7 +23,8 @@ LastExportDetail, Workspace, FyleCredential, - Configuration + Configuration, + WorkspaceSchedule ) from apps.fyle.models import ( Expense, @@ -34,7 +36,8 @@ get_fund_source, get_source_account_type, handle_import_exception, - construct_expense_filter_query + construct_expense_filter_query, + update_task_log_post_import ) from apps.fyle.actions import ( mark_expenses_as_skipped, @@ -86,7 +89,7 @@ def schedule_expense_group_creation(workspace_id: int) -> None: async_task('apps.fyle.tasks.create_expense_groups', workspace_id, fund_source, task_log) -def create_expense_groups(workspace_id: int, fund_source: list[str], task_log: TaskLog, imported_from: ExpenseImportSourceEnum) -> None: +def create_expense_groups(workspace_id: int, fund_source: list[str], task_log: TaskLog | None, imported_from: ExpenseImportSourceEnum) -> None: """ Create expense groups :param task_log: Task log object @@ -97,8 +100,8 @@ def create_expense_groups(workspace_id: int, fund_source: list[str], task_log: T with transaction.atomic(): workspace = Workspace.objects.get(pk=workspace_id) - last_synced_at = workspace.last_synced_at - ccc_last_synced_at = workspace.ccc_last_synced_at + last_synced_at = workspace.last_synced_at if imported_from != ExpenseImportSourceEnum.CONFIGURATION_UPDATE else None + ccc_last_synced_at = workspace.ccc_last_synced_at if imported_from != ExpenseImportSourceEnum.CONFIGURATION_UPDATE else None fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id) expense_group_settings = ExpenseGroupSettings.objects.get(workspace_id=workspace_id) @@ -140,61 +143,38 @@ def create_expense_groups(workspace_id: int, fund_source: list[str], task_log: T if workspace.ccc_last_synced_at or len(expenses) != reimbursable_expense_count: workspace.ccc_last_synced_at = datetime.now() - workspace.save() + if imported_from != ExpenseImportSourceEnum.CONFIGURATION_UPDATE: + workspace.save() group_expenses_and_save(expenses, task_log, workspace, imported_from=imported_from) except NoPrivilegeError: logger.info('Invalid Fyle Credentials / Admin is disabled') - task_log.detail = { - 'message': 'Invalid Fyle Credentials / Admin is disabled' - } - task_log.status = 'FAILED' - task_log.save() + update_task_log_post_import(task_log, 'FAILED', message='Invalid Fyle Credentials / Admin is disabled') except FyleCredential.DoesNotExist: logger.info('Fyle credentials not found %s', workspace_id) - task_log.detail = { - 'message': 'Fyle credentials do not exist in workspace' - } - task_log.status = 'FAILED' - task_log.save() + update_task_log_post_import(task_log, 'FAILED', message='Fyle credentials do not exist in workspace') except RetryException: logger.info('Fyle Retry Exception occured in workspace_id: %s', workspace_id) - task_log.detail = { - 'message': 'Fyle Retry Exception occured' - } - task_log.status = 'FATAL' - task_log.save() + update_task_log_post_import(task_log, 'FATAL', message='Fyle Retry Exception occured') except InvalidTokenError: logger.info('Invalid Token for Fyle') - task_log.detail = { - 'message': 'Invalid Token for Fyle' - } - task_log.status = 'FAILED' - task_log.save() + update_task_log_post_import(task_log, 'FAILED', message='Invalid Token for Fyle') except InternalServerError: logger.info('Fyle Internal Server Error occured in workspace_id: %s', workspace_id) - task_log.detail = { - 'message': 'Fyle Internal Server Error occured' - } - task_log.status = 'FAILED' - task_log.save() + update_task_log_post_import(task_log, 'FAILED', message='Fyle Internal Server Error occured') except Exception: error = traceback.format_exc() - task_log.detail = { - 'error': error - } - task_log.status = 'FATAL' - task_log.save() + update_task_log_post_import(task_log, 'FATAL', error=error) logger.exception('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail) -def group_expenses_and_save(expenses: list[dict], task_log: TaskLog, workspace: Workspace, imported_from: ExpenseImportSourceEnum = None) -> None: +def group_expenses_and_save(expenses: list[dict], task_log: TaskLog | None, workspace: Workspace, imported_from: ExpenseImportSourceEnum = None) -> None: """ Group expenses and save :param expenses: Expenses @@ -237,8 +217,10 @@ def group_expenses_and_save(expenses: list[dict], task_log: TaskLog, workspace: except Exception: logger.error('Error posting accounting export summary for workspace_id: %s', workspace.id) - task_log.status = 'COMPLETE' - task_log.save() + if task_log: + task_log.status = 'COMPLETE' + task_log.updated_at = datetime.now() + task_log.save(update_fields=['status', 'updated_at']) def import_and_export_expenses(report_id: str, org_id: str, is_state_change_event: bool, report_state: str = None, imported_from: ExpenseImportSourceEnum = None) -> None: @@ -282,8 +264,17 @@ def import_and_export_expenses(report_id: str, org_id: str, is_state_change_even expense_groups = ExpenseGroup.objects.filter(expenses__id__in=[expense_ids], workspace_id=workspace.id, exported_at__isnull=True).distinct('id').values('id') expense_group_ids = [expense_group['id'] for expense_group in expense_groups] - if len(expense_group_ids) and not is_state_change_event: - export_to_intacct(workspace.id, None, expense_group_ids, triggered_by=imported_from) + if len(expense_group_ids): + if is_state_change_event: + # Trigger export immediately for customers who have enabled real time export + is_real_time_export_enabled = WorkspaceSchedule.objects.filter(workspace_id=workspace.id, is_real_time_export_enabled=True).exists() + + # Don't allow real time export if it's not supported for the branded app / setting not enabled + if not is_real_time_export_enabled or not feature_configuration.feature.real_time_export_1hr_orgs: + return + + logger.info(f'Exporting expenses for workspace {workspace.id} with expense group ids {expense_group_ids}, triggered by {imported_from}') + export_to_intacct(workspace_id=workspace.id, expense_group_ids=expense_group_ids, triggered_by=imported_from) except Configuration.DoesNotExist: logger.info('Configuration does not exist for workspace_id: %s', workspace.id) diff --git a/apps/fyle/views.py b/apps/fyle/views.py index 94229206..5da3b4b2 100644 --- a/apps/fyle/views.py +++ b/apps/fyle/views.py @@ -73,7 +73,7 @@ def post(self, request: Request, *args, **kwargs) -> Response: fund_source.append('CCC') create_expense_groups( - kwargs['workspace_id'], + workspace_id=kwargs['workspace_id'], fund_source=fund_source, task_log=task_log, imported_from=ExpenseImportSourceEnum.DASHBOARD_SYNC @@ -512,7 +512,7 @@ def post(self, request: Request, *args, **kwargs) -> Response: """ task_log, fund_source = get_task_log_and_fund_source(kwargs['workspace_id']) - create_expense_groups(kwargs['workspace_id'], fund_source, task_log, imported_from=ExpenseImportSourceEnum.DASHBOARD_SYNC) + create_expense_groups(workspace_id=kwargs['workspace_id'], fund_source=fund_source, task_log=task_log, imported_from=ExpenseImportSourceEnum.DASHBOARD_SYNC) return Response( status=status.HTTP_200_OK diff --git a/apps/internal/tasks.py b/apps/internal/tasks.py index 2b4887e2..198b0283 100644 --- a/apps/internal/tasks.py +++ b/apps/internal/tasks.py @@ -67,7 +67,7 @@ def re_export_stuck_exports() -> None: export_expense_group_ids = list(expense_groups.filter(workspace_id=workspace_id).values_list('id', flat=True)) if export_expense_group_ids and len(export_expense_group_ids) < 200: logger.info('Re-triggering export for expense group %s since no 1 hour schedule for workspace %s', export_expense_group_ids, workspace_id) - export_to_intacct(workspace_id, 'AUTO', export_expense_group_ids, triggered_by=ExpenseImportSourceEnum.INTERNAL) + export_to_intacct(workspace_id=workspace_id, expense_group_ids=export_expense_group_ids, triggered_by=ExpenseImportSourceEnum.INTERNAL) else: logger.info('Skipping export for workspace %s since it has more than 200 expense groups', workspace_id) diff --git a/apps/sage_intacct/queue.py b/apps/sage_intacct/queue.py index 8bd09ed3..fc802558 100644 --- a/apps/sage_intacct/queue.py +++ b/apps/sage_intacct/queue.py @@ -89,7 +89,7 @@ def schedule_journal_entries_creation( ) if task_log.status not in ['IN_PROGRESS', 'ENQUEUED']: task_log.status = 'ENQUEUED' - if task_log.triggered_by != triggered_by: + if triggered_by and task_log.triggered_by != triggered_by: task_log.triggered_by = triggered_by task_log.save() @@ -190,7 +190,7 @@ def schedule_expense_reports_creation(workspace_id: int, expense_group_ids: list ) if task_log.status not in ['IN_PROGRESS', 'ENQUEUED']: task_log.status = 'ENQUEUED' - if task_log.triggered_by != triggered_by: + if triggered_by and task_log.triggered_by != triggered_by: task_log.triggered_by = triggered_by task_log.save() @@ -254,7 +254,7 @@ def schedule_bills_creation(workspace_id: int, expense_group_ids: list[str], is_ ) if task_log.status not in ['IN_PROGRESS', 'ENQUEUED']: task_log.status = 'ENQUEUED' - if task_log.triggered_by != triggered_by: + if triggered_by and task_log.triggered_by != triggered_by: task_log.triggered_by = triggered_by task_log.save() @@ -318,7 +318,7 @@ def schedule_charge_card_transaction_creation(workspace_id: int, expense_group_i ) if task_log.status not in ['IN_PROGRESS', 'ENQUEUED']: task_log.status = 'ENQUEUED' - if task_log.triggered_by != triggered_by: + if triggered_by and task_log.triggered_by != triggered_by: task_log.triggered_by = triggered_by task_log.save() diff --git a/apps/sage_intacct/views.py b/apps/sage_intacct/views.py index 188cf357..928618c4 100644 --- a/apps/sage_intacct/views.py +++ b/apps/sage_intacct/views.py @@ -13,6 +13,7 @@ from fyle_accounting_mappings.serializers import DestinationAttributeSerializer from fyle_accounting_library.common_resources.models import DimensionDetail from fyle_accounting_library.common_resources.enums import DimensionDetailSourceTypeEnum +from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum from sageintacctsdk.exceptions import InvalidTokenError @@ -124,7 +125,7 @@ def post(self, request: Request, *args, **kwargs) -> Response: """ Trigger exports """ - export_to_intacct(workspace_id=self.kwargs['workspace_id']) + export_to_intacct(workspace_id=self.kwargs['workspace_id'], triggered_by=ExpenseImportSourceEnum.DASHBOARD_SYNC) return Response( status=status.HTTP_200_OK diff --git a/apps/workspaces/actions.py b/apps/workspaces/actions.py index 9e859004..5f6b807f 100644 --- a/apps/workspaces/actions.py +++ b/apps/workspaces/actions.py @@ -19,7 +19,7 @@ logger.level = logging.INFO -def export_to_intacct(workspace_id: int, export_mode: bool = None, expense_group_ids: list = [], triggered_by: ExpenseImportSourceEnum = None) -> None: +def export_to_intacct(workspace_id: int, expense_group_ids: list = [], triggered_by: ExpenseImportSourceEnum = None) -> None: """ Export expenses to Intacct :param workspace_id: Workspace ID @@ -33,7 +33,7 @@ def export_to_intacct(workspace_id: int, export_mode: bool = None, expense_group last_exported_at = datetime.now() is_expenses_exported = False - export_mode = export_mode or 'MANUAL' + export_mode = 'MANUAL' if triggered_by in (ExpenseImportSourceEnum.DASHBOARD_SYNC, ExpenseImportSourceEnum.DIRECT_EXPORT, ExpenseImportSourceEnum.CONFIGURATION_UPDATE) else 'AUTO' expense_group_filters = { 'exported_at__isnull': True, 'workspace_id': workspace_id diff --git a/apps/workspaces/apis/advanced_settings/serializers.py b/apps/workspaces/apis/advanced_settings/serializers.py index 426d85ef..160fc21b 100644 --- a/apps/workspaces/apis/advanced_settings/serializers.py +++ b/apps/workspaces/apis/advanced_settings/serializers.py @@ -146,7 +146,8 @@ class Meta: 'enabled', 'interval_hours', 'additional_email_options', - 'emails_selected' + 'emails_selected', + 'is_real_time_export_enabled' ] diff --git a/apps/workspaces/apis/advanced_settings/triggers.py b/apps/workspaces/apis/advanced_settings/triggers.py index 09fbb558..e7766d90 100644 --- a/apps/workspaces/apis/advanced_settings/triggers.py +++ b/apps/workspaces/apis/advanced_settings/triggers.py @@ -17,7 +17,8 @@ def run_post_configurations_triggers(workspace_id: int, workspace_schedule: Work schedule_enabled=workspace_schedule.get('enabled'), hours=workspace_schedule.get('interval_hours'), email_added=workspace_schedule.get('additional_email_options'), - emails_selected=workspace_schedule.get('emails_selected') + emails_selected=workspace_schedule.get('emails_selected'), + is_real_time_export_enabled=workspace_schedule.get('is_real_time_export_enabled') ) @staticmethod diff --git a/apps/workspaces/migrations/0045_workspaceschedule_is_real_time_export_enabled.py b/apps/workspaces/migrations/0045_workspaceschedule_is_real_time_export_enabled.py new file mode 100644 index 00000000..e38c4a1e --- /dev/null +++ b/apps/workspaces/migrations/0045_workspaceschedule_is_real_time_export_enabled.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.21 on 2025-05-21 17:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('workspaces', '0044_configuration_je_single_credit_line'), + ] + + operations = [ + migrations.AddField( + model_name='workspaceschedule', + name='is_real_time_export_enabled', + field=models.BooleanField(default=False), + ), + ] diff --git a/apps/workspaces/models.py b/apps/workspaces/models.py index 3019053a..cf977fac 100644 --- a/apps/workspaces/models.py +++ b/apps/workspaces/models.py @@ -207,6 +207,7 @@ class WorkspaceSchedule(models.Model): error_count = models.IntegerField(null=True, help_text='Number of errors in export') additional_email_options = JSONField(default=list, help_text='Email and Name of person to send email', null=True) emails_selected = ArrayField(base_field=models.CharField(max_length=255), null=True, help_text='Emails that has to be send mail') + is_real_time_export_enabled = models.BooleanField(default=False) schedule = models.OneToOneField(Schedule, on_delete=models.PROTECT, null=True) created_at = models.DateTimeField(auto_now_add=True, null=True, help_text='Created at datetime') updated_at = models.DateTimeField(auto_now=True, null=True, help_text='Updated at datetime') diff --git a/apps/workspaces/tasks.py b/apps/workspaces/tasks.py index 35683a49..40c9b4f4 100644 --- a/apps/workspaces/tasks.py +++ b/apps/workspaces/tasks.py @@ -28,14 +28,14 @@ logger.level = logging.INFO -def schedule_email_notification(workspace_id: int, schedule_enabled: bool) -> None: +def schedule_email_notification(workspace_id: int, schedule_enabled: bool, hours: int) -> None: """ Schedule email notification :param workspace_id: workspace id :param schedule_enabled: schedule enabled :return: None """ - if schedule_enabled: + if schedule_enabled and hours: schedule, _ = Schedule.objects.update_or_create( func='apps.workspaces.tasks.run_email_notification', cluster='import', @@ -56,7 +56,7 @@ def schedule_email_notification(workspace_id: int, schedule_enabled: bool) -> No schedule.delete() -def schedule_sync(workspace_id: int, schedule_enabled: bool, hours: int, email_added: list, emails_selected: list) -> WorkspaceSchedule: +def schedule_sync(workspace_id: int, schedule_enabled: bool, hours: int, email_added: list, emails_selected: list, is_real_time_export_enabled: bool) -> WorkspaceSchedule: """ Schedule sync :param workspace_id: workspace id @@ -70,31 +70,36 @@ def schedule_sync(workspace_id: int, schedule_enabled: bool, hours: int, email_a workspace_id=workspace_id ) - schedule_email_notification(workspace_id=workspace_id, schedule_enabled=schedule_enabled) + schedule_email_notification(workspace_id=workspace_id, schedule_enabled=schedule_enabled, hours=hours) if schedule_enabled: ws_schedule.enabled = schedule_enabled ws_schedule.start_datetime = datetime.now() ws_schedule.interval_hours = hours ws_schedule.emails_selected = emails_selected + ws_schedule.is_real_time_export_enabled = is_real_time_export_enabled if email_added: ws_schedule.additional_email_options.append(email_added) - # create next run by adding hours to current time - next_run = datetime.now() + timedelta(hours=hours) - - schedule, _ = Schedule.objects.update_or_create( - func='apps.workspaces.tasks.run_sync_schedule', - args='{}'.format(workspace_id), - defaults={ - 'schedule_type': Schedule.MINUTES, - 'minutes': hours * 60, - 'next_run': next_run - } - ) - - ws_schedule.schedule = schedule + if is_real_time_export_enabled: + # Delete existing schedule since user changed the setting to real time export + schedule = ws_schedule.schedule + if schedule: + ws_schedule.schedule = None + ws_schedule.save() + schedule.delete() + else: + schedule, _ = Schedule.objects.update_or_create( + func='apps.workspaces.tasks.run_sync_schedule', + args='{}'.format(workspace_id), + defaults={ + 'schedule_type': Schedule.MINUTES, + 'minutes': hours * 60, + 'next_run': datetime.now() + timedelta(hours=hours), + } + ) + ws_schedule.schedule = schedule ws_schedule.save() @@ -135,7 +140,7 @@ def run_sync_schedule(workspace_id: int) -> None: ) if task_log.status == 'COMPLETE': - export_to_intacct(workspace_id, 'AUTO', triggered_by=ExpenseImportSourceEnum.BACKGROUND_SCHEDULE) + export_to_intacct(workspace_id=workspace_id, triggered_by=ExpenseImportSourceEnum.BACKGROUND_SCHEDULE) def run_email_notification(workspace_id: int) -> None: diff --git a/apps/workspaces/urls.py b/apps/workspaces/urls.py index 5dcc5e4a..5cb98c3e 100644 --- a/apps/workspaces/urls.py +++ b/apps/workspaces/urls.py @@ -2,7 +2,6 @@ from apps.workspaces.views import ( ReadyView, - ScheduleView, WorkspaceView, ConnectFyleView, ConfigurationsView, @@ -18,7 +17,6 @@ path('/configuration/', ConfigurationsView.as_view()), path('ready/', ReadyView.as_view({'get': 'get'})), path('/exports/trigger/', ExportToIntacctView.as_view({'post': 'post'}), name='export-to-intacct'), - path('/schedule/', ScheduleView.as_view({'post': 'post', 'get': 'get'})), path('/admins/', WorkspaceAdminsView.as_view({'get': 'get'}), name='admin'), path('/export_detail/', LastExportDetailView.as_view(), name='export-detail') ] diff --git a/apps/workspaces/views.py b/apps/workspaces/views.py index dd6b6ec2..42935934 100644 --- a/apps/workspaces/views.py +++ b/apps/workspaces/views.py @@ -1,5 +1,6 @@ from cryptography.fernet import Fernet +from django.db.models import Q, QuerySet from django.conf import settings from django.core.cache import cache from django_q.tasks import async_task @@ -25,14 +26,13 @@ from apps.fyle.models import ExpenseGroupSettings from apps.fyle.helpers import get_cluster_domain +from apps.tasks.models import TaskLog from apps.workspaces.actions import export_to_intacct -from apps.workspaces.tasks import schedule_sync from apps.workspaces.models import ( Workspace, Configuration, FyleCredential, LastExportDetail, - WorkspaceSchedule, SageIntacctCredential ) from apps.workspaces.serializers import ( @@ -40,7 +40,6 @@ ConfigurationSerializer, FyleCredentialSerializer, LastExportDetailSerializer, - WorkspaceScheduleSerializer, SageIntacctCredentialSerializer, ) @@ -451,54 +450,6 @@ def patch(self, request: Request, **kwargs) -> Response: ) -class ScheduleView(viewsets.ViewSet): - """ - Schedule View - """ - def post(self, request: Request, **kwargs) -> Response: - """ - Post Settings - """ - schedule_enabled = request.data.get('schedule_enabled') - assert_valid(schedule_enabled is not None, 'Schedule enabled cannot be null') - - hours = request.data.get('hours') - assert_valid(hours is not None, 'Hours cannot be left empty') - - email_added = request.data.get('added_email') - emails_selected = request.data.get('selected_email') - - workspace_schedule_settings = schedule_sync( - workspace_id=kwargs['workspace_id'], - schedule_enabled=schedule_enabled, - hours=hours, - email_added=email_added, - emails_selected=emails_selected - ) - - return Response( - data=WorkspaceScheduleSerializer(workspace_schedule_settings).data, - status=status.HTTP_200_OK - ) - - def get(self, *args, **kwargs) -> Response: - try: - schedule = WorkspaceSchedule.objects.get(workspace_id=kwargs['workspace_id']) - - return Response( - data=WorkspaceScheduleSerializer(schedule).data, - status=status.HTTP_200_OK - ) - - except WorkspaceSchedule.DoesNotExist: - return Response( - data={ - 'message': 'Schedule settings does not exist in workspace' - }, - status=status.HTTP_400_BAD_REQUEST - ) - - class WorkspaceAdminsView(viewsets.ViewSet): """ Workspace Admins View @@ -533,6 +484,42 @@ class LastExportDetailView(generics.RetrieveAPIView): queryset = LastExportDetail.objects.filter(last_exported_at__isnull=False, total_expense_groups_count__gt=0) serializer_class = LastExportDetailSerializer + def get_queryset(self) -> QuerySet[LastExportDetail]: + return super().get_queryset() + + def retrieve(self, request: Request, *args, **kwargs) -> Response: + instance = self.get_object() + serializer = self.get_serializer(instance) + response_data = serializer.data + + start_date = request.query_params.get('start_date') + + if start_date and response_data: + misc_task_log_types = ['CREATING_REIMBURSEMENT', 'CREATING_AP_PAYMENT', 'FETCHING_EXPENSES'] + + task_logs = TaskLog.objects.filter( + ~Q(type__in=misc_task_log_types), + workspace_id=kwargs['workspace_id'], + updated_at__gte=start_date, + status='COMPLETE', + ).order_by('-updated_at') + + successful_count = task_logs.count() + + failed_count = TaskLog.objects.filter( + ~Q(type__in=misc_task_log_types), + status__in=['FAILED', 'FATAL'], + workspace_id=kwargs['workspace_id'], + ).count() + + response_data.update({ + 'repurposed_successful_count': successful_count, + 'repurposed_failed_count': failed_count, + 'repurposed_last_exported_at': task_logs.last().updated_at if task_logs.last() else None + }) + + return Response(response_data) + class ExportToIntacctView(viewsets.ViewSet): """ diff --git a/requirements.txt b/requirements.txt index f2d3de7c..a894c955 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ djangorestframework==3.15.2 django-sendgrid-v5==1.2.0 future==1.0.0 fyle==1.0.0 -fyle-accounting-mappings==1.45.0 +fyle-accounting-mappings==2.2.0 fyle-integrations-platform-connector==2.2.2 fyle-rest-auth==1.7.2 gevent==24.11.1 diff --git a/tests/sql_fixtures/reset_db_fixtures/reset_db.sql b/tests/sql_fixtures/reset_db_fixtures/reset_db.sql index 2c973c1e..05a7dbd2 100644 --- a/tests/sql_fixtures/reset_db_fixtures/reset_db.sql +++ b/tests/sql_fixtures/reset_db_fixtures/reset_db.sql @@ -3,7 +3,7 @@ -- -- Dumped from database version 15.12 (Debian 15.12-1.pgdg120+1) --- Dumped by pg_dump version 15.12 (Debian 15.12-0+deb12u2) +-- Dumped by pg_dump version 15.13 (Debian 15.13-0+deb12u1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2796,7 +2796,8 @@ CREATE TABLE public.workspace_schedules ( emails_selected character varying(255)[], error_count integer, created_at timestamp with time zone, - updated_at timestamp with time zone + updated_at timestamp with time zone, + is_real_time_export_enabled boolean NOT NULL ); @@ -2857,7 +2858,8 @@ CREATE TABLE public.failed_events ( created_at timestamp with time zone NOT NULL, updated_at timestamp with time zone NOT NULL, error_traceback text, - workspace_id integer + workspace_id integer, + is_resolved boolean NOT NULL ); @@ -5959,6 +5961,8 @@ COPY public.django_migrations (id, app, name, applied) FROM stdin; 232 workspaces 0043_configuration_skip_accounting_export_summary_post 2025-04-24 16:15:00.283053+00 233 workspaces 0044_configuration_je_single_credit_line 2025-05-07 18:31:07.544615+00 234 sage_intacct 0031_costcode 2025-05-12 09:47:16.361962+00 +235 rabbitmq 0004_failedevent_is_resolved 2025-05-21 17:02:59.151171+00 +236 workspaces 0045_workspaceschedule_is_real_time_export_enabled 2025-05-21 17:05:12.778782+00 \. @@ -9408,7 +9412,7 @@ COPY public.expenses (id, employee_email, category, sub_category, project, expen -- Data for Name: failed_events; Type: TABLE DATA; Schema: public; Owner: postgres -- -COPY public.failed_events (id, routing_key, payload, created_at, updated_at, error_traceback, workspace_id) FROM stdin; +COPY public.failed_events (id, routing_key, payload, created_at, updated_at, error_traceback, workspace_id, is_resolved) FROM stdin; \. @@ -9888,7 +9892,7 @@ COPY public.users (password, last_login, id, email, user_id, full_name, active, -- Data for Name: workspace_schedules; Type: TABLE DATA; Schema: public; Owner: postgres -- -COPY public.workspace_schedules (id, enabled, start_datetime, interval_hours, schedule_id, workspace_id, additional_email_options, emails_selected, error_count, created_at, updated_at) FROM stdin; +COPY public.workspace_schedules (id, enabled, start_datetime, interval_hours, schedule_id, workspace_id, additional_email_options, emails_selected, error_count, created_at, updated_at, is_real_time_export_enabled) FROM stdin; \. @@ -9998,7 +10002,7 @@ SELECT pg_catalog.setval('public.django_content_type_id_seq', 53, true); -- Name: django_migrations_id_seq; Type: SEQUENCE SET; Schema: public; Owner: postgres -- -SELECT pg_catalog.setval('public.django_migrations_id_seq', 233, true); +SELECT pg_catalog.setval('public.django_migrations_id_seq', 236, true); -- diff --git a/tests/test_fyle/fixtures.py b/tests/test_fyle/fixtures.py index 1c8c2782..dfde9195 100644 --- a/tests/test_fyle/fixtures.py +++ b/tests/test_fyle/fixtures.py @@ -40,7 +40,10 @@ "Fyle Categories": "", }, "bank_transaction_id": None, - "is_posted_at_null": False + "is_posted_at_null": False, + "report_title": "#5: May 2024", + "payment_number": "P/2023/8/T/1221", + "masked_corporate_card_number": "**** 4567" }, ], "raw_expense": { diff --git a/tests/test_fyle/test_signals.py b/tests/test_fyle/test_signals.py new file mode 100644 index 00000000..43800877 --- /dev/null +++ b/tests/test_fyle/test_signals.py @@ -0,0 +1,127 @@ +from fyle_accounting_library.fyle_platform.enums import FundSourceEnum, ExpenseImportSourceEnum, ExpenseStateEnum + +from apps.fyle.models import ExpenseGroupSettings +from apps.workspaces.models import Configuration + + +def test_run_pre_save_expense_group_setting_triggers_no_existing_settings(db, mocker): + """ + Test when there are no existing expense group settings + """ + workspace_id = 1 + Configuration.objects.filter(workspace_id=workspace_id).delete() + expense_group_settings = ExpenseGroupSettings.objects.get(workspace_id=workspace_id) + + mock_async = mocker.patch('apps.fyle.signals.async_task') + + # Save should not trigger any async tasks since there's no existing settings + expense_group_settings.save() + mock_async.assert_not_called() + + +def test_run_pre_save_expense_group_setting_triggers_reimbursable_state_change(db, mocker): + """ + Test when reimbursable expense state changes from PAID to PAYMENT_PROCESSING + """ + workspace_id = 1 + + expense_group_settings, _ = ExpenseGroupSettings.objects.update_or_create( + workspace_id=workspace_id, + defaults={ + 'expense_state': ExpenseStateEnum.PAID, + 'ccc_expense_state': ExpenseStateEnum.PAID + } + ) + + mock_async = mocker.patch('apps.fyle.signals.async_task') + + # Change reimbursable state + expense_group_settings.expense_state = ExpenseStateEnum.PAYMENT_PROCESSING + expense_group_settings.save() + + # Verify async_task was called with correct parameters + mock_async.assert_called_once_with( + 'apps.fyle.tasks.create_expense_groups', + workspace_id=workspace_id, + task_log=None, + fund_source=[FundSourceEnum.PERSONAL], + imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE + ) + + +def test_run_pre_save_expense_group_setting_triggers_ccc_state_change(db, mocker): + """ + Test when corporate credit card expense state changes from PAID to APPROVED + """ + workspace_id = 1 + + expense_group_settings, _ = ExpenseGroupSettings.objects.update_or_create( + workspace_id=workspace_id, + defaults={ + 'expense_state': ExpenseStateEnum.PAYMENT_PROCESSING, + 'ccc_expense_state': ExpenseStateEnum.PAID + } + ) + + mock_async = mocker.patch('apps.fyle.signals.async_task') + + # Change CCC state + expense_group_settings.ccc_expense_state = ExpenseStateEnum.APPROVED + expense_group_settings.save() + + # Verify async_task was called with correct parameters + mock_async.assert_called_once_with( + 'apps.fyle.tasks.create_expense_groups', + workspace_id=workspace_id, + task_log=None, + fund_source=[FundSourceEnum.CCC], + imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE + ) + + +def test_run_pre_save_expense_group_setting_triggers_no_configuration(db, mocker): + """ + Test when workspace general settings don't exist + """ + workspace_id = 1 + + Configuration.objects.filter(workspace_id=workspace_id).delete() + expense_group_settings, _ = ExpenseGroupSettings.objects.update_or_create( + workspace_id=workspace_id, + defaults={ + 'expense_state': ExpenseStateEnum.PAID, + 'ccc_expense_state': ExpenseStateEnum.PAID + } + ) + + mock_async = mocker.patch('apps.fyle.signals.async_task') + + expense_group_settings.expense_state = ExpenseStateEnum.PAYMENT_PROCESSING + expense_group_settings.ccc_expense_state = ExpenseStateEnum.APPROVED + expense_group_settings.save() + + # Verify no async tasks were called due to missing configuration + mock_async.assert_not_called() + + +def test_run_pre_save_expense_group_setting_triggers_no_state_change(db, mocker): + """ + Test when expense states don't change + """ + workspace_id = 1 + + expense_group_settings, _ = ExpenseGroupSettings.objects.update_or_create( + workspace_id=workspace_id, + defaults={ + 'expense_state': ExpenseStateEnum.PAID, + 'ccc_expense_state': ExpenseStateEnum.PAID + } + ) + + mock_async = mocker.patch('apps.fyle.signals.async_task') + + # Save without changing states + expense_group_settings.save() + + # Verify no async tasks were called + mock_async.assert_not_called() diff --git a/tests/test_workspaces/fixtures.py b/tests/test_workspaces/fixtures.py index 4ed80586..b8b635f0 100644 --- a/tests/test_workspaces/fixtures.py +++ b/tests/test_workspaces/fixtures.py @@ -63,6 +63,5 @@ 'is_posted_at_null': True, 'masked_corporate_card_number': '**** 4567' } - ], - 'workspace_schedule': {'id': 1, 'enabled': True, 'start_datetime': '2022-09-26T13:08:16.281604Z', 'interval_hours': 1, 'error_count': None, 'additional_email_options': [], 'emails_selected': ['ashwin.t@fyle.in'], 'workspace': 1, 'schedule': 9, 'updated_at': '2022-05-13', 'created_at': '2022-05-13'} + ] } diff --git a/tests/test_workspaces/test_apis/test_advanced_settings/fixtures.py b/tests/test_workspaces/test_apis/test_advanced_settings/fixtures.py index f712f235..1f0447a8 100644 --- a/tests/test_workspaces/test_apis/test_advanced_settings/fixtures.py +++ b/tests/test_workspaces/test_apis/test_advanced_settings/fixtures.py @@ -24,6 +24,7 @@ "interval_hours": 24, "emails_selected": ["fyle@fyle.in"], "additional_email_options": {}, + "is_real_time_export_enabled": False }, }, "response": { @@ -52,6 +53,7 @@ "interval_hours": 24, "emails_selected": [], "additional_email_options": [], + "is_real_time_export_enabled": False }, "workspace_id": 9, }, diff --git a/tests/test_workspaces/test_tasks.py b/tests/test_workspaces/test_tasks.py index 844631d9..ca747063 100644 --- a/tests/test_workspaces/test_tasks.py +++ b/tests/test_workspaces/test_tasks.py @@ -37,7 +37,8 @@ def test_schedule_sync(db): emails_selected=[ 'ashwin.t@fyle.in' ], - workspace_id=workspace_id + workspace_id=workspace_id, + is_real_time_export_enabled=False ) ws_schedule = WorkspaceSchedule.objects.filter( @@ -53,7 +54,8 @@ def test_schedule_sync(db): emails_selected=[ 'ashwin.t@fyle.in' ], - workspace_id=workspace_id + workspace_id=workspace_id, + is_real_time_export_enabled=False ) ws_schedule = WorkspaceSchedule.objects.filter( @@ -140,7 +142,8 @@ def test_email_notification(mocker,db): emails_selected=[ 'user5@fyleforgotham.in' ], - workspace_id=workspace_id + workspace_id=workspace_id, + is_real_time_export_enabled=False ) ws_schedule = WorkspaceSchedule.objects.filter( diff --git a/tests/test_workspaces/test_views.py b/tests/test_workspaces/test_views.py index 6bb2266e..1855917e 100644 --- a/tests/test_workspaces/test_views.py +++ b/tests/test_workspaces/test_views.py @@ -1,4 +1,5 @@ import json +from datetime import datetime from unittest import mock from django.urls import reverse @@ -11,8 +12,9 @@ from tests.helper import dict_compare_keys +from apps.tasks.models import TaskLog from apps.mappings.models import ImportLog -from apps.workspaces.models import WorkspaceSchedule, SageIntacctCredential, Configuration, LastExportDetail, Workspace +from apps.workspaces.models import SageIntacctCredential, Configuration, LastExportDetail, Workspace from .fixtures import data from tests.test_fyle.fixtures import data as fyle_data @@ -345,44 +347,6 @@ def test_connect_sageintacct_view_exceptions(api_client, test_connection): assert response.status_code == 401 -def test_workspace_schedule(api_client, test_connection): - """ - Test Workspace Schedule - """ - workspace_id = 1 - - url = '/api/workspaces/{}/schedule/'.format(workspace_id) - - api_client.credentials(HTTP_AUTHORIZATION='Bearer {}'.format(test_connection.access_token)) - - response = api_client.get(url) - - WorkspaceSchedule.objects.get_or_create( - workspace_id=workspace_id - ) - response = api_client.get(url) - - response = json.loads(response.content) - assert dict_compare_keys(response, data['workspace_schedule']) == [], 'workspace_schedule api returns a diff in keys' - - response = api_client.post( - url, - data={ - "hours": 1, - "schedule_enabled": True, - "added_email": None, - "selected_email": [ - "ashwin.t@fyle.in" - ] - }, - format='json' - ) - assert response.status_code == 200 - - response = json.loads(response.content) - assert dict_compare_keys(response, data['workspace_schedule']) == [], 'workspace_schedule api returns a diff in keys' - - def test_general_settings_detail(api_client, test_connection): """ Test General Settings Detail @@ -483,6 +447,39 @@ def test_last_export_detail_view(mocker, db, api_client, test_connection): assert response.status_code == 404 +def test_last_export_detail_2(mocker, api_client, test_connection): + """ + Test Last Export Detail View + """ + workspace_id = 1 + + Configuration.objects.filter(workspace_id=workspace_id).update( + reimbursable_expenses_object='BILL', + corporate_credit_card_expenses_object='BILL' + ) + + url = "/api/workspaces/{}/export_detail/?start_date=2025-05-01".format(workspace_id) + + api_client.credentials( + HTTP_AUTHORIZATION="Bearer {}".format(test_connection.access_token) + ) + + LastExportDetail.objects.get(workspace_id=workspace_id) + # last_exported_at=datetime.now(), total_expense_groups_count=1 + + TaskLog.objects.create(type='CREATING_EXPENSE_REPORT', status='COMPLETE', workspace_id=workspace_id) + + failed_count = TaskLog.objects.filter(workspace_id=workspace_id, status__in=['FAILED', 'FATAL']).count() + + response = api_client.get(url) + assert response.status_code == 200 + + response = json.loads(response.content) + assert response['repurposed_successful_count'] == 1 + assert response['repurposed_failed_count'] == failed_count + assert response['repurposed_last_exported_at'] is not None + + def test_import_code_field_view(db, mocker, api_client, test_connection): """ Test ImportCodeFieldView diff --git a/workers/export/worker.py b/workers/export/worker.py index 62ce5b4f..a411fb17 100644 --- a/workers/export/worker.py +++ b/workers/export/worker.py @@ -33,13 +33,6 @@ def process_message(self, routing_key: str, event: BaseEvent, delivery_tag: int) try: logger.info('Processing task for workspace - %s with routing key - %s and payload - %s with delivery tag - %s', payload_dict['workspace_id'], routing_key, payload_dict, delivery_tag) - # We're gonna retry failed events since this queue is primarily webhook calls from Fyle, if this is a scheduled export, it doesn't necessarily needs to be retried - retry_count = payload_dict.get('retry_count', 0) - if retry_count >= 2: - logger.error('Task failed after %s retries, dropping task', retry_count) - self.qconnector.reject_message(delivery_tag, requeue=False) - return - handle_exports(payload_dict['data']) self.qconnector.acknowledge_message(delivery_tag) logger.info('Task processed successfully for workspace - %s with routing key - %s and delivery tag - %s', payload_dict['workspace_id'], routing_key, delivery_tag) From 7a6267b47dc9582e414bebc93136a69323724948 Mon Sep 17 00:00:00 2001 From: ashwin1111 Date: Wed, 21 May 2025 22:44:40 +0530 Subject: [PATCH 2/3] fix lint --- tests/test_workspaces/test_views.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_workspaces/test_views.py b/tests/test_workspaces/test_views.py index 1855917e..9d4a50fd 100644 --- a/tests/test_workspaces/test_views.py +++ b/tests/test_workspaces/test_views.py @@ -1,5 +1,4 @@ import json -from datetime import datetime from unittest import mock from django.urls import reverse From 07aad31beb50a4663ef8c4149bb47c19fdc68894 Mon Sep 17 00:00:00 2001 From: ashwin1111 Date: Wed, 28 May 2025 14:56:54 +0530 Subject: [PATCH 3/3] fix turn off bug --- apps/workspaces/tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/workspaces/tasks.py b/apps/workspaces/tasks.py index d014fa0d..26843e33 100644 --- a/apps/workspaces/tasks.py +++ b/apps/workspaces/tasks.py @@ -69,15 +69,15 @@ def schedule_sync(workspace_id: int, schedule_enabled: bool, hours: int, email_a ws_schedule, _ = WorkspaceSchedule.objects.get_or_create( workspace_id=workspace_id ) + ws_schedule.is_real_time_export_enabled = is_real_time_export_enabled + ws_schedule.enabled = schedule_enabled schedule_email_notification(workspace_id=workspace_id, schedule_enabled=schedule_enabled, hours=hours) if schedule_enabled: - ws_schedule.enabled = schedule_enabled ws_schedule.start_datetime = datetime.now() ws_schedule.interval_hours = hours ws_schedule.emails_selected = emails_selected - ws_schedule.is_real_time_export_enabled = is_real_time_export_enabled if email_added: ws_schedule.additional_email_options.append(email_added) @@ -105,10 +105,11 @@ def schedule_sync(workspace_id: int, schedule_enabled: bool, hours: int, email_a elif not schedule_enabled and ws_schedule.schedule: schedule = ws_schedule.schedule - ws_schedule.enabled = schedule_enabled ws_schedule.schedule = None ws_schedule.save() schedule.delete() + else: + ws_schedule.save() return ws_schedule