Riley Hun
08/08/2020, 12:51 AM# parent class
class ABC:
def __init__(
self,
user,
password
)
self.user = user
self.password = password
def query(self)
pass
I want to do something like this
# task
class Task_A(Task, ABC):
def run()
pass
Currently, I'm just doing this instead
@task
def task_a(user, password, date_col, dataset_id):
user: str = None,
password: str = None,
date_col: str = None,
dataset_id: str = None
):
conn = ABC(user=user, password=password)
query = f"SELECT DISTINCT {date_col} FROM EDW_DNA_DB.WI.{dataset_id}"
query_result = conn.query(query)
return query_result[date_col].tolist()
bral
08/08/2020, 9:21 AMQwame
08/09/2020, 9:06 AMF1 >> [f2, f3, f4, f5, f6]
What's the best way to set these dependencies in Prefect. I notice that set_downstream doesn't accept a list of tasks. Is there any efficient way to do this in Prefect?
Also does the new Prefect UI mean I don't need docker to run it?
ThanksVikram Iyer
08/10/2020, 6:05 AMVikram Iyer
08/10/2020, 6:05 AMemmanuel
08/10/2020, 7:20 AMLewis Bails
08/10/2020, 8:26 AMflow.environment = DaskKubernetesEnvironment(min_workers=1s, max_workers=5)
flow.storage = Docker(dockerfile='./dockerfile', env_vars={'known_hosts': #known_hosts, 'ssh_prv_key': #prv_key})
flow.register(project_name='Automated training')
[2020-08-10 08:07:11] INFO - prefect.Docker | Building the flow's Docker storage...
Traceback (most recent call last):
File "/home/leb/anaconda3/envs/ai-pipelines/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status
response.raise_for_status()
File "/home/leb/.local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http+docker://localhost/v1.40/build?t=auto-dask-kube-cloud%3A2020-08-10t08-07-11-222981-00-00&q=False&nocache=False&rm=False&forcerm=True&pull=False&dockerfile=.%2Ftmpez2venih%2FDockerfile>
Can anyone help me out?Roy
08/10/2020, 11:00 AMmerge
two task results together in a single one, the merge is supposed to return the first "real" result. What is considered a "real" result? It seems that only None
is considered a non-real result. I would have expected that NoResult
results would also be skipped, but this is not the case. Is this intended? Example in thread.Amit
08/10/2020, 11:24 AMlogger = logging.getLogger(__name__)
I have this in code everywhere and I don't want replace it with from prefect.utilities.logging import get_logger
everywhere, is there a way to send the default logger messages to cloud?Jim Klassen
08/10/2020, 2:23 PMJim Klassen
08/10/2020, 2:25 PMMarwan Sarieddine
08/10/2020, 3:44 PM$ kubectl logs pod/prefect-job-d45c5eda-cxn9x -n prefect
Usage: prefect execute [OPTIONS] COMMAND [ARGS]...
Try 'prefect execute -h' for help.
Error: No such command 'flow-run'.
bral
08/10/2020, 3:47 PMPhilip MacMenamin
08/10/2020, 4:23 PMlist
as a Parameter
? (As opposed to a str
- or do you just pass a string and split?)Marwan Sarieddine
08/10/2020, 5:27 PMMarwan Sarieddine
08/10/2020, 5:49 PMPhilip MacMenamin
08/10/2020, 6:36 PMwith Flow("STL Flow") as f:
files = Parameter("files")
f.run(files='fname.txt')
Riley Hun
08/10/2020, 8:57 PMRiley Hun
08/11/2020, 4:49 AMLewis Bails
08/11/2020, 6:05 AMemre
08/11/2020, 7:52 AM0.13.1
, I can’t get the slack_notifier
to work. This used to work in 0.12.1
. Here is a minimal example of a flow:
from prefect import Flow, task, context
from prefect.utilities.notifications import slack_notifier
@task
def log_run_id():
context.get("logger").warning(context.get("flow_run_id"))
@task(state_handlers=[slack_notifier])
def get_1():
return 1
with Flow("slack_example") as flow:
log = log_run_id()
result = get_1(upstream_tasks=[log])
flow.run()
Apparently slack_notifier
attempts to communicate with a backend server, but I am not going to use one. More info in thread.bral
08/11/2020, 9:35 AMemmanuel
08/11/2020, 11:31 AMemmanuel
08/11/2020, 2:29 PMPREFECT_SERVER__GRAPHQL_URL
the right env variable for the UI docker image? it seems to be the one used in the docker compose file but when ran outside of docker (so not on localhost), it doesn’t seem to be working 😕Richard Hughes
08/11/2020, 2:55 PMChris Martin
08/11/2020, 2:59 PMtask.map
, but I can't get it to work and wonder if this is possible
Here's some example code:
from prefect import task, Flow, Task
@task
def task1():
pass
@task
def task2():
return ["a", "b", "c"]
@task
def task3(x):
pass
with Flow("test") as flow:
t1 = task1()
params = task2()
task3.map(params, upstream_tasks=[t1])
I can register this flow fine and the dag looks coorrect (see attachment), but when I run, task3
fails with "No upstream states can be mapped over".Am I doing something wrong here?Akshay Verma
08/11/2020, 3:12 PMemmanuel
08/11/2020, 3:53 PMapollo
OSS as well?emmanuel
08/11/2020, 4:05 PMHannah Amundson
08/11/2020, 4:54 PMprefect.context.get("logger")
How do we get that logger inside of each of our classes/functions that are being called within tasks? Is the best practice to just pass it in as a parameter?Hannah Amundson
08/11/2020, 4:54 PMprefect.context.get("logger")
How do we get that logger inside of each of our classes/functions that are being called within tasks? Is the best practice to just pass it in as a parameter?Jim Crist-Harif
08/11/2020, 5:20 PMTask
) you can use the logger
attribute directly. Otherwise either getting from prefect.context
or calling prefect.utilities.logging.get_logger
directly should work the same.alex
08/11/2020, 5:56 PMlog_stdout
for your tasksHannah Amundson
08/12/2020, 4:11 PM