a = 3
b = 'hola'
c = f'{hola} {a} veces'
import os from string import Template class TemplatesMixin: TEMPLATES_DIR = None def _read_template(self, template_path): with open(os.path.join(self.TEMPLATES_DIR, template_path)) as template: return template.read() def render(self, template_path, **kwargs): return Template( self._read_template(template_path) ).substitute(**kwargs)
<html> <head></head> <body> Hi $name </body> </html>
>>> x = {'a': 1, 'b': 2} >>> y = {'b': 3, 'c': 4} >>> z = {**x, **y} >>> z {'c': 4, 'a': 1, 'b': 3}
>>> a = [1, 2] >>> b = [3, 4] >>> a + b [1, 2, 3, 4]
@dataclass
class Process:
PID: int
PPID: int
cmd: str
The __init__ method will already be in your class.
Note that here type hinting is required, that is why I have used int and str in the example. If you don't know the type of your field, you can use Any from the typing module.
The Data Class has many advantages compared to the proposed solutions:
You can add documentation to the parameters and the return value:
def add(a:"first number" = 0, b:"second number" = 0) -> "sum of a and b": return a+b for item in add.__annotations__.items(): print(item) # ('a', 'first number') # ('b', 'second number') # ('return', 'sum of a and b')
Since Python 3.5 we can write co-routines with async/await keywords. Actions on co-routines are allowed by the asyncio package.
Co-routines are functions that in a way similar to green threads are executed in a single thread and process. However, they are not so costly as are not managed by the operating system but by the event loop.
For an advance use go to Advanced asyncio.
async keyword before a function definition. await keyword means that at that point is safe to change co-routine.await.await expression. There are: co-routines, tasks, and futures.async def a_greet(name): print (f'Hello {name}') print(a_greet('Potato')) # <coroutine object a_greet at 0x7f0c19673170>
import asyncio async def a_greet(name): print (f'Hello {name}') asyncio.run(a_greet('Potato')) # Hello Potato
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, f"{time.strftime('%X')}: hello") await say_after(2, f"{time.strftime('%X')}: world") print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 11:31:12 # 11:31:12: hello # 11:31:13: world # finished at 11:31:15
import asyncio async def nested(): return 42 async def main(): nested() # RuntimeWarning: coroutine 'nested' was never awaited. It was never called. print(await nested()) # will print "42". asyncio.run(main())
import asyncio async def nested(value): print(value) async def main(): tasks = [nested(i) for i in range(10)] await asyncio.gather(*tasks) asyncio.run(main())
async def func(): async with db.connect() as conn: async for user in db.query_users(): print(user.name)
Cancelling
Shielding: asyncio.shield(async_func_call): Avoids the async_func_call to be cancelled. It can happen that the function that calls the shielded function is cancelled, with this the async_func_call won't stop given that case.
Set a time out with coroutine asyncio.wait_for.
Get all unfinished tasks with asyncio.all_tasks.
Tasks are used to schedule co-routines.
You create tasks with the asyncio.create_task function.
import asyncio async def nested(value): return value async def main(): task = asyncio.create_task(nested(3)) await task print(task.result()) asyncio.run(main())
Interesting methods are:
cancel()cancelled(), done(), to check the statusadd_done_callbackWhile a Task is running in the event loop, no other Tasks can run in the same thread. When a Task executes an await expression, the running Task gets suspended, and the event loop executes the next Task.
To schedule a callback from a different OS thread, the loop.call_soon_threadsafe().
You can run co-routines thanks to the event loop.
The most basic function for running a co-routine into the event loop is asyncio.run. You can give it a parameter like a main co-routine/function which will call other co-routines. This function is a high-level and high-level functions are advised to be used.
Functions to manage the event loop are:
asyncio.get_running_loop() which returns the event loop of the current thread. If it does not exist, an exception will be raised.asyncio.get_event_loop() which returns the event loop of the current thread. If it does not exist, one will be crated.asyncio.set_event_loop(loop) sets loop as the current event loop for the current thread.asyncio.new_event_loop()Methods of an event loop:
run_until_complete(future)stop()is_running()close()
call_soon and call_soon_threadsafe will a sync function at the next iteration of the loop. call_soon_threadsafe must be used to schedule callbacks from another thread.
import asyncio def prnt(value): print(f"Hola {value}") async def arange(count): for i in range(count): yield(i) async def main(): asyncio.get_event_loop().call_soon(prnt, "hello") prnt("hola") async for i in arange(3): print(i) await asyncio.sleep(0) asyncio.run(main(), debug=True) # Hola hola # 0 # Hola hello # 1 # 2
call_later and call_at allow to set when calling a non-async-function scheduled by loop iterations.
create_task and create_future . There is also the set_task_factory if you want to create your own task instances.
A Future represents an eventual result of an asynchronous operation.
async def set_after(fut, delay, value): await asyncio.sleep(delay) fut.set_result(value) async def main(): loop = asyncio.get_running_loop() fut = loop.create_future() loop.create_task(set_after(fut, 1, '... world')) # We could have just used "asyncio.create_task()". print('hello ...') print(await fut) # Wait until *fut* has a result (1 second) and print it. asyncio.run(main())
An instance of Executor provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.
async def main(): loop = asyncio.get_running_loop() result = await loop.run_in_executor( # 1. Returns a Future.Executor None, blocking_io) print('default thread pool', result) with concurrent.futures.ThreadPoolExecutor() as pool: # 2. Run in a custom thread pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) with concurrent.futures.ProcessPoolExecutor() as pool: # 3. Run in a custom process pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result)
ThreadPoolExecutor uses a pool of threads in the same way than ProcessPoolExecutor uses a pool of processes.
executor = ThreadPoolExecutor(max_workers=2) a = executor.submit(wait_on_b) b = executor.submit(wait_on_a)
These allow to manage IO resources (sockets) asynchronously.
You can make use of the usual tools for synchronize co-routines (Lock, Event, Condition, Semaphore, BoundedSemaphore).
Asynchronous queues that can be used to distribute workload between several concurrent tasks:
import asyncio import random import time async def worker(name, queue): while True: sleep_for = await queue.get() await asyncio.sleep(sleep_for) queue.task_done() async def main(): queue = asyncio.Queue() total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') asyncio.run(main())
These are based on generators (since Python 3.5 they are not useful anymore):
@asyncio.coroutine def old_style_coroutine(): yield from asyncio.sleep(1) async def main(): await old_style_coroutine()
You can debug co-routines using the next configurations:
debug=True to asyncio.run().loop.set_debug().With this:
functools.partial allows to create a function that not requires arguments from one that requires them.
loop.call_soon(partial(print, "Hello", flush=True))
import asyncio import functools import os import signal def ask_exit(signame, loop): print("got signal %s: exit" % signame) loop.stop() async def main(): loop = asyncio.get_running_loop() for signame in {'SIGINT', 'SIGTERM'}: loop.add_signal_handler( getattr(signal, signame), functools.partial(ask_exit, signame, loop)) await asyncio.sleep(3600) print("Event loop running for 1 hour, press Ctrl+C to interrupt.") print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.") asyncio.run(main())
from enum import Enum class Numbers(Enum): ONE = 1 TWO = 2 number = Numbers(2) # <Numbers.TWO: 2> number1 = Numbers["ONE"] # <Numbers.TWO: 2> members = [n for n in Numbers] # [<Numbers.ONE: 1>, <Numbers.TWO: 2>] values = [n.value for n in Numbers] # [1, 2]
pi: float = 3.142
def headline(text: str, align: bool = True) -> str:
pass
The MyPy library allows to check code files.
>>> names: list = ["Guido", "Jukka", "Ivan"]
>>> version: tuple = (3, 7, 1)
>>> options: dict = {"centered": False, "capitalize": True}
>>> from typing import Dict, List, Tuple
>>> names: List[str] = ["Guido", "Jukka", "Ivan"]
>>> version: Tuple[int, int, int] = (3, 7, 1)
>>> options: Dict[str, bool] = {"centered": False, "capitalize": True}
import random
from typing import Any, Sequence
def choose(items: Sequence[Any]) -> Any:
return random.choice(items)
class Animal:
def __init__(self, name: str, birthday: date) -> None:
self.name = name
self.birthday = birthday
@classmethod
def newborn(cls, name: str) -> "Animal":
return cls(name, date.today())
def twin(self, name: str) -> "Animal":
cls = self.__class__
return cls(name, self.birthday)
def headline(text, width=80, fill_char="-"):
# type: (str, int, str) -> str
return f" {text.title()} ".center(width, fill_char)
def headline(
text, # type: str
width=80, # type: int
fill_char="-", # type: str
): # type: (...) -> str
return f" {text.title()} ".center(width, fill_char)
$ python3 -m venv myenv $ source myenv/bin/activate
To deactivate:
$ deactivate
# Replace python2 by python3 sudo ln -s /usr/bin/python3.5 /usr/bin/python # Fix pip3 sudo apt-get remove python3-pip; sudo apt-get install python3-pip # Install VirtualEnvWrapper for python3 sudo pip3 install virtualenvwrapper
You can force reinstall pip:
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py python3 get-pip.py --force-reinstall
sudo python3 -m pip uninstall pip && sudo apt install python3-pip --reinstall sudo python -m pip uninstall pip && sudo apt install python-pip --reinstall
It is a list of strings defining what symbols in a module will be exported when from <module> import * is used on the module.
__all__ = ['bar', 'baz'] waz = 5 bar = 10 def baz(): return 'baz'
These symbols can then be imported like so:
from foo import * print bar print baz # The following will trigger an exception, as "waz" is not exported by the module print waz
If the __all__ above is commented out, this code will then execute to completion, as the default behavior of import * is to import all symbols that do not begin with an underscore, from the given namespace.
__all__ affects the from <module> import * behavior only. Members that are not mentioned in __all__ are still accessible from outside the module and can be imported with from <module> import <member>
To install from downloaded packages:
pip install --no-index --find-links=/tmp/python -r requirements.txt