Last updated: Apr 11, 2024
Reading time·5 min
The message "DeprecationWarning: There is no current event loop" is shown
when you try to call the asyncio.get_event_loop()
method when there isn't a
running loop in the current thread.
Here is an example of when the warning is shown.
import asyncio async def func(index): print(f'Before {index}') await asyncio.sleep(0.1) print(f'After {index}...') loop = asyncio.get_event_loop() async def create_tasks_func(): tasks = [] for idx in range(3): tasks.append(asyncio.create_task(func(idx))) await asyncio.wait(tasks) # ⛔️ DeprecationWarning: There is no current event loop loop.run_until_complete(create_tasks_func()) loop.close()
The issue in the code sample is that starting with Python 3.11, an error is
raised if you call asyncio.get_event_loop
without a running event loop.
To resolve the issue:
asyncio.new_event_loop()
method to create and return a new event
loop object.asyncio.set_event_loop()
method to set the current event loop for
the current OS thread.import asyncio async def func(index): print(f'Before {index}') await asyncio.sleep(0.1) print(f'After {index}...') # 1) Create an event loop object ✅ loop = asyncio.new_event_loop() # 2) Set the current event loop for the current OS thread ✅ asyncio.set_event_loop(loop) async def create_tasks_func(): tasks = [] for idx in range(3): tasks.append(asyncio.create_task(func(idx))) await asyncio.wait(tasks) loop.run_until_complete(create_tasks_func()) loop.close()
The following line creates the event loop object.
# 1) Create an event loop object ✅ loop = asyncio.new_event_loop()
And the following line sets the current event loop for the current OS thread.
# 2) Set the current event loop for the current OS thread ✅ asyncio.set_event_loop(loop)
If I run the code sample with python main.py
, no warnings are shown.
python main.py
If you need your code to be able to run in Python version < 3.10 and Python >
3.10, use an if/else
statement when setting up the event loop.
import sys import asyncio if sys.version_info < (3, 10): loop = asyncio.get_event_loop() else: try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
The if
statement is run if the current Python version is less than 3.10
, so
we use the
asyncio.get_event_loop()
method.
Otherwise, the else
block runs.
We try to call the
asyncio.get_running_loop
method and if a RuntimeError
is raised, we use the asyncio.new_event_loop()
method.
The last step is to set the current event loop for the current OS thread in the
else
block.
You might also get the warning when using the asyncio.gather() method.
Since Python version 3.10, a deprecation warning is shown if:
If you got the warning when using gather()
, there is no event loop running at
the time you called the gather()
method.
Your code will continue to work until you switch to Python version 3.11.
To resolve the issue, await your call to the gather()
method from another
coroutine.
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({number}), currently i={i}...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): # Schedule three calls *concurrently*: L = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) print(L) asyncio.run(main())
By the time the asyncio.gather()
method is called, there is a running event
loop so the issue is resolved.
The example uses the asyncio.run() method to execute the supplied coroutine.
The asyncio.run()
method runs the supplied coroutine and takes care of:
asyncio
event loop.The error "RuntimeError: no running event loop" occurs when you try to add tasks to an event loop that isn't currently running.
To solve the error, call the create_task()
method on the event loop object
instead.
Here is an example of how the error occurs.
import asyncio async def example_func(index): print(f'Current index {index}') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = [] # ⛔️ RuntimeError: no running event loop # sys:1: RuntimeWarning: coroutine 'example_func' was never awaited for idx in range(3): tasks.append(asyncio.create_task(example_func(idx))) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
The loop in the example isn't running, so we can't add tasks to it.
We are adding tasks to the event loop and waiting for all of them to finish.
To solve the error, call the create_task
method on the created loop object
instead of on the asyncio
module.
import asyncio async def example_func(index): print(f'Current index {index}') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = [] for idx in range(3): # ✅ Updated this line tasks.append(loop.create_task(example_func(idx))) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
I replaced the following line:
# ⛔️ Incorrect tasks.append(asyncio.create_task(example_func(idx)))
With the following line
# ✅ Correct tasks.append(loop.create_task(example_func(idx)))
Notice that we are now calling the create_task
method on the created loop
object and not on the asyncio
module.
create_task
method on the loop
object, you create a task for the specific loop.Conversely, when you call the create_task
method on the asyncio
module, it
looks for a running event loop and doesn't find one which causes the error.
You can also solve the error by creating the tasks in an async
function.
import asyncio async def example_func(index): print(f'Before {index}') await asyncio.sleep(0.2) print(f'After {index}') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def create_tasks(): tasks = [] for idx in range(3): tasks.append(asyncio.create_task(example_func(idx))) await asyncio.wait(tasks) loop.run_until_complete(create_tasks()) loop.close()
The create_tasks
function is also marked as async
.
By the time the asyncio.create_task()
method is run, the event loop is already
running so asyncio
is able to attach tasks to it.
get_running_loop
method without a running event loopThe "RuntimeError: no running event loop" also occurs when you call the
get_running_loop
method without a running event loop.
import asyncio async def example_func(): for idx in range(3): print(idx) await asyncio.sleep(1) # ⛔️ RuntimeError: no running event loop loop = asyncio.get_running_loop() loop.run_until_complete(example_func())
The asyncio.get_running_loop
method tries to get a running event loop but
doesn't find one, so a RuntimeError
is raised.
To solve the error:
import asyncio async def example_func(): for idx in range(3): print(idx) await asyncio.sleep(1) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(example_func())
The loop.run_until_complete method runs until the asyncio.Task has completed.
You can use another async
function if you need to call the
asyncio.get_running_loop()
method.
import asyncio async def example_func(): for idx in range(3): print(idx) await asyncio.sleep(1) async def get_loop(): loop_ = asyncio.get_running_loop() await loop_.create_task(example_func()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(get_loop())
By the time the asyncio.get_running_loop()
method runs, there is a running
event loop, so the error is no longer raised.
You can learn more about the related topics by checking out the following tutorials: