menu

Questions & Answers

django channels Application instance took too long to shut down and was killed

I have a web server uploaded on a server that receives messages from a Windows app and sends them to a front end in the browser

Its late there is a slight delay in receiving and sending and when I checked the django logs I get this message from time to time

aioredis==1.3.1
asgiref==3.4.1
channels==3.0.4
channels-redis==3.3.1
daphne==3.0.2
2022-03-03 21:04:54,257 ERROR    Exception inside application: Attempt to send on a closed protocol
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/channels/routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "/usr/local/lib/python3.8/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "/usr/local/lib/python3.8/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "/usr/local/lib/python3.8/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.8/site-packages/channels/middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "/usr/local/lib/python3.8/site-packages/channels/routing.py", line 150, in __call__
    return await application(
  File "/usr/local/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/usr/local/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/usr/local/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/usr/local/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/app/backend/server/./script/consumers.py", line 242, in chat_message
    await self.send(text_data=json.dumps({
  File "/usr/local/lib/python3.8/site-packages/channels/generic/websocket.py", line 209, in send
    await super().send({"type": "websocket.send", "text": text_data})
  File "/usr/local/lib/python3.8/site-packages/channels/consumer.py", line 81, in send
    await self.base_send(message)
  File "/usr/local/lib/python3.8/site-packages/channels/sessions.py", line 226, in send
    return await self.real_send(message)
  File "/usr/local/lib/python3.8/site-packages/daphne/server.py", line 234, in handle_reply
    protocol.handle_reply(message)
  File "/usr/local/lib/python3.8/site-packages/daphne/ws_protocol.py", line 202, in handle_reply
    self.serverSend(message["text"], False)
  File "/usr/local/lib/python3.8/site-packages/daphne/ws_protocol.py", line 256, in serverSend
    self.sendMessage(content.encode("utf8"), binary)
  File "/usr/local/lib/python3.8/site-packages/autobahn/websocket/protocol.py", line 2217, in sendMessage
    raise Disconnected("Attempt to send on a closed protocol")
autobahn.exception.Disconnected: Attempt to send on a closed protocol

consumers.py


class ScriptConsumers(AsyncWebsocketConsumer):
    def log_to_json(self, log):
        '''
        convert log to json
        return object like seriailize
        '''
        created_at = log.get_date()
        return{
            # long data
            }
        }

    def save_new_log(self, obj):
        '''
        take log object in args
        and save to db
        return passed object converted to json
        '''
        # make sure to return queryset
        try:
            queryset = LogFor.objects.get(
                username=obj['from'])
        except LogFor.DoesNotExist:
            queryset = LogFor.objects.create(username=obj['from'], online=1)

        try:
            new_log = Logs.objects.create(
                logfor=queryset,
                  # long data
            )
        except Exception as e:
            new_log = Logs.objects.create(
               # long data
            )
        content = {
            'command': 'new_log',
            'log': self.log_to_json(new_log)
        }
        return content

    def get_latest_10_log(self, obj):
        '''
        return last 10 log for each user in db
        '''

        queryset = LogFor.objects.all()
        serializer = LogsForSerializers(queryset, many=True)
        return serializer.data

    async def new_log(self, obj):
        '''
        save log in async and call passed it back to group
        '''
        content = await database_sync_to_async(self.save_new_log)(obj)
        await self.send_logs_data(content)

    async def fetch_logs(self, data):

        '''
        get latest 10 log in async and put it back to websocket listener
        '''
        content = await database_sync_to_async(self.get_latest_10_log)(data)
        await self.send_logs(content)

    def update_user_incr(self, user):
        '''
        update online status
        '''
        try:
            user = LogFor.objects.get(username=user.replace("_", " "))
            user.online = 1
            user.save()
        except:
            return

    def update_user_decr(self, user):
        '''
        update online status
        '''
        try:
            user = LogFor.objects.get(username=user.replace("_", " "))
            user.online = 0
            user.save()
        except:
            return

    async def connect(self):

        try:
            self.username = self.scope['url_route']['kwargs']['username']
            await database_sync_to_async(self.update_user_incr)(self.username)
        except Exception as e:
            print("Connect() : ", e)

        self.room_group_name = 'logs'  # all user in same group
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        try:
            if(self.username):
                await database_sync_to_async(self.update_user_decr)(self.username)

            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )
        except Exception as e:
            print("Disconnect() : ", e)
            print("Disconnect() close_Code : ", close_code)
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )

    commands = {
        "fetch_logs": fetch_logs,
        "new_log": new_log
    }

    # Receive message from WebSocket
    async def receive(self, text_data):
        data = json.loads(text_data)
        await self.commands[data['command']](self, data)


    # Receive message from room group
    async def send_logs_data(self, data):
        try:
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'message': data
                }
            )
        except Exception as e:
            print("send_logs_data() : ", e)

    async def send_logs(self, message):
        try:
            await self.send(text_data=json.dumps(message))
        except Exception as e:
            print("send_logs() : ", e)

  

It took 3 days in debugging without find from where this error


update 1 :

After checking, I found that this is the reason

WARNING  Application instance <Task pending name='Task-672341' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efebc730640>()]>> for connection <WebSocketProtocol client=['172.19.0.6', 43842] path=b'/ws/script/'> took too long to shut down and was killed.
WARNING  Application instance <Task pending name='Task-672341' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efebc730640>()]>> for connection <WebSocketProtocol client=['172.19.0.6', 43842] path=b'/ws/script/'> took too long to shut down and was killed.
WARNING  Application instance <Task pending name='Task-672341' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efebc730640>()]>> for connection <WebSocketProtocol client=['172.19.0.6', 43842] path=b'/ws/script/'> took too long to shut down and was killed.
WARNING  Application instance <Task pending name='Task-672341' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efebc730640>()]>> for connection <WebSocketProtocol client=['172.19.0.6', 43842] path=b'/ws/script/'> took too long to shut down and was killed.
Comments:
2023-01-25 00:55:07
What was the reason? Is not really that clear
2023-01-25 00:55:07
I mean that the error message in update 1, the cause of the interruption of socket for a moment
Answers(0) :