Python asyncio
With asyncio, while we wait, the event loop can do other tasks
- asyncio.get_event_loop
- asyncio.sleep
- call_later
| hello world |
|---|
| import asyncio
import logging
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
async def main():
logging.debug("Hello ...")
asyncio.get_event_loop().call_later(
1.0,
lambda: logging.debug("Immediate callback executed"))
await asyncio.sleep(2)
logging.debug("... World!")
if __name__ == "__main__":
asyncio.run(main())
|
The lambda function call after 1 sec event we execute asyncio.sleep
| 2026-01-20 16:55:01 - DEBUG - Hello ...
2026-01-20 16:55:02 - DEBUG - Immediate callback executed
2026-01-20 16:55:03 - DEBUG - ... World!
|
gather
code
| import asyncio
import logging
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
async def say_hello_async():
logging.debug("Preparing Async World! and go to sleep...")
await asyncio.sleep(2) # Simulates waiting for 2 seconds
logging.debug("Hello, Async World!")
async def do_something_else():
logging.debug("Starting another task...")
await asyncio.sleep(1) # Simulates doing something else for 1 second
logging.debug("Finished another task!")
async def main():
# Schedule both tasks to run concurrently
await asyncio.gather(
say_hello_async(),
do_something_else(),
)
asyncio.run(main())
|
Future
future with callback
| import asyncio
async def producer(fut: asyncio.Future):
print("Producer: working...")
await asyncio.sleep(2)
# Fulfill the future
fut.set_result("Hello from the future!")
async def consumer(fut: asyncio.Future):
print("Consumer: waiting for result...")
result = await fut
print(f"Consumer: got -> {result}")
async def main():
loop = asyncio.get_running_loop()
# Create an empty Future
fut = loop.create_future()
await asyncio.gather(
producer(fut),
consumer(fut),
)
asyncio.run(main())
|
future with exception
| import asyncio
async def producer(fut: asyncio.Future):
print("Producer: working...")
await asyncio.sleep(1)
# Fulfill the future
fut.set_exception(RuntimeError("Something went wrong"))
async def consumer(fut: asyncio.Future):
try:
await fut
except Exception as e:
print("Consumer caught:", e)
async def main():
loop = asyncio.get_running_loop()
# Create an empty Future
fut = loop.create_future()
await asyncio.gather(
producer(fut),
consumer(fut),
)
asyncio.run(main())
|
future with callback
| import asyncio
def on_ready(fut: asyncio.Future):
print("Future is ready!")
print("Result:", fut.result())
async def producer(fut: asyncio.Future):
await asyncio.sleep(1)
fut.set_result("done")
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
fut.add_done_callback(on_ready)
await producer(fut)
asyncio.run(main())
|
future with thread safe
| import asyncio
def external_thread(loop, fut):
loop.call_soon_threadsafe(fut.set_result, 99)
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
fut.add_done_callback(lambda f: print("Result:", f.result()))
import threading
threading.Thread(target=external_thread, args=(loop, fut)).start()
await fut
asyncio.run(main())
|
Mixing
Mixing Async and Sync:
register corotine in loop from worker thread
| import asyncio
import threading
import time
async def job(msg: str):
await asyncio.sleep(0.2)
print("Async job:", msg)
def worker(loop: asyncio.AbstractEventLoop):
for i in range(5):
time.sleep(0.5)
loop.call_soon_threadsafe(asyncio.create_task, job(f"hello {i}"))
async def main():
loop = asyncio.get_running_loop()
threading.Thread(target=worker, args=(loop,), daemon=True).start()
await asyncio.sleep(3)
asyncio.run(main())
|
Wait and Timeout
simple wait_for
| import asyncio
async def slow_task():
await asyncio.sleep(2)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=1.0)
print(result)
except asyncio.TimeoutError:
print("Timed out!")
asyncio.run(main())
|
wait for the first
| import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} finished"
async def main():
tasks = [
asyncio.create_task(worker("A", 2)),
asyncio.create_task(worker("B", 1)),
asyncio.create_task(worker("C", 3)),
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
# One or more tasks finished
for task in done:
print("First result:", task.result())
# Optional: cancel the rest
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(main())
|