Zach Khorozian
04/01/2021, 2:49 PMLuis Gallegos
04/01/2021, 4:58 PMfrom prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect
executor = LocalDaskExecutor(num_workers=1)
flow1 = StartFlowRun("flow1", project_name='test', wait=True)
flow2 = StartFlowRun("flow2", project_name='test', wait=True)
with Flow("example", executor=executor) as flow:
table_dict_param_list = []
with open('parameters.txt', 'r') as f:
lines = f.readlines()
for cnt, line in enumerate(lines):
dict_param = {}
dict_param['param1'] = cnt
dict_param['param2'] = line
table_dict_param_list.append(table_dict_param)
flow1 = flow1()
## i need this execution to be sequential like in a "for loop"
flow2.map(parameters=table_dict_param_list)
flow.register(project_name="test")
Nicholas Chammas
04/01/2021, 5:17 PMidempotency_key=flow.serialized_hash()
the default?
https://docs.prefect.io/orchestration/concepts/flows.html#core-client
I can’t think of why someone would want the Flow version to change if the Flow definition hasn’t.Riley Hun
04/01/2021, 5:58 PMprefect-server-towel
seems to have an ErrImagePull
error, and prefect-server-hasura
and prefect-server-ui
can't pull their respective images from the registry.
Here's my deployment command I entered:
helm repo add prefecthq <https://prefecthq.github.io/server/>
helm install ${NAME} prefecthq/prefect-server --values=values.yaml
liren zhang
04/01/2021, 10:28 PMeli
04/01/2021, 11:04 PM@task(checkpoint=False)
def long_running_dask_task(inputs: dict, client: Client) -> boolean:
futures: List[Future] = []
while(True):
next = get_next(inputs)
if not next:
break
f = client.submit(func, next)
futures.append(f)
client.gather(futures)
return True
with Flow('local-dask-flow') as flow:
with DaskCluster(...) as client:
long_running_dask_task(param_1)
flow.executor = LocalExecutor()
https://docs.prefect.io/core/idioms/resource-manager.html#example-creating-a-temporary-dask-clusterJonathan Chu
04/02/2021, 12:34 AMCA Lee
04/02/2021, 1:21 AMmatta
04/02/2021, 1:47 AMmapped
state?Jonathan Chu
04/02/2021, 1:48 AMRUN
tab with Docker
doesn't seem to parse the JSON version of the Environment variables correctly
seems to keep the wrapping quotes as part of the valueJeremy Tee
04/02/2021, 7:47 AMprefect cloud
, and my flow will invoke aws lambda
and return me with the response. However, after registering my flow, whenever i try to run it, it throws Unexpected error: TypeError("cannot serialize '_io.BufferedReader' object")
Is there a workaround on this?
@task(name="invoke_lambda")
def invoke_lambda(function_name, table_path, etl_target_date):
lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps({"table_path": table_path, "etl_target_date": etl_target_date}),
)
return response
with Flow(
"test-flow",
executor=LocalExecutor(),
run_config=LocalRun(),
storage=S3(
bucket="random-bucket",
),
) as flow:
x = invoke_lambda("test", "a/b/c", "2021/04/02")
flow.register(project_name="xxx", labels=["dev"])
Matthew Blau
04/02/2021, 1:34 PM@task(max_retries=3)
but how can I set up the retry logic if I am not setting up tasks with the functional API? I do not see anything in the docs that explains. Thank you in advance!Marwan Sarieddine
04/02/2021, 4:13 PMNikola Milushev
04/02/2021, 4:34 PM@task(max_retries=6, retry_delay=timedelta(minutes=10), timeout=60)
, however it seems the task run is stuck before it can trigger the retry. May be there another reason this is happening other than the OOM Killer, as seen in a similar topic from 15.03?Jay Shah
04/03/2021, 11:41 PMUnexpected error: TypeError('execute() takes no keyword arguments')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/sql_server/sql_server.py", line 90, in run
executed = cursor.execute(query=query, vars=data)
TypeError: execute() takes no keyword arguments
tash lai
04/05/2021, 6:01 AMproduce
stay in memory until the flow finishes, or will it be removed as soon as consume
finishes?
@task
def produce(url):
return download_big_json(url)
@task
def consume(big_json):
do_something(big_json)
with Flow('my_flow') as flow:
urls = Parameter('urls')
produced = produce.map(urls)
consume.map(produced)
Varun Joshi
04/05/2021, 8:20 AMJeremy Tee
04/05/2021, 10:00 AMRob Fowler
04/05/2021, 12:16 PMhaf
04/05/2021, 5:11 PMBrett Naul
04/05/2021, 6:12 PMJoseph Loss
04/05/2021, 6:54 PMTomás Emilio Silva Ebensperger
04/05/2021, 9:43 PMZach Hodowanec
04/05/2021, 9:54 PMWillian Chan
04/05/2021, 9:59 PMGitLab
The UI tells me: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'mail_client'")
.
The main problem here is that the file mail_client.py is not present in the agent, and for me it is impracticable to send each auxiliary script to the agent (there is going to be a lot of flows)
The structure of the repository:
gitlab-repository/
├── flow.py
└── mail_client.py
Inside my flow.py it imports the mail_client:
from mail_client import MailClient
...
...
The configuration for GitLab
storage:
flow.storage = GitLab(
repo="XXXXX",
host="XXXXX",
path="flow.py",
secrets=["GITLAB_ACCESS_TOKEN"]
)
I need the agent to be able to pull the entire repository because there will be many processes being inserted in the prefect and there is no way to change the agent with each modification in a process.
does anyone have any solution for this? ThanksJonathan Chu
04/06/2021, 12:40 AMmatta
04/06/2021, 1:51 AMDaskExecutor
in a Jupyrer notebook and I'm using threads (cluster_kwargs={"processes": False}
) then I see the logs in the notebook. If I take that off though, then the logs disappear. How do I have the logs go to the notebook again?Wolfgang Steitz
04/06/2021, 9:36 AMLevi Leal
04/06/2021, 11:04 AMlogger = prefect.context.get('logger')
logger.addHandler(log_handler)
I need something like this
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)
I add the handler to the 'root' logger and everything is logged the way I need.
I've tried the latter and it works fine with flow.run()
, but when I register the flow I can't get it to work with k8s.
More details in the threadhaf
04/06/2021, 1:07 PMhaf
04/06/2021, 1:07 PMKevin Kho
04/06/2021, 2:19 PMhaf
04/06/2021, 2:20 PMTyler Wanner
04/06/2021, 5:20 PMhaf
04/06/2021, 5:25 PMTyler Wanner
04/06/2021, 5:26 PMhaf
04/06/2021, 5:26 PMTyler Wanner
04/06/2021, 5:27 PM/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/job_template.yaml
that's the install location for python3.8 but in the prefect image, by default it runs 3.7. if you mount it to /usr/local/lib/python3.7/site-packages/prefect/agent/kubernetes/job_template.yaml
then you don't have to actually change the args at all (this is equivalent to overwriting the default file which, as you said, works)haf
04/19/2021, 4:57 PMTyler Wanner
04/19/2021, 10:21 PM