liren zhang
04/01/2021, 10:28 PMeli
04/01/2021, 11:04 PM@task(checkpoint=False)
def long_running_dask_task(inputs: dict, client: Client) -> boolean:
futures: List[Future] = []
while(True):
next = get_next(inputs)
if not next:
break
f = client.submit(func, next)
futures.append(f)
client.gather(futures)
return True
with Flow('local-dask-flow') as flow:
with DaskCluster(...) as client:
long_running_dask_task(param_1)
flow.executor = LocalExecutor()
https://docs.prefect.io/core/idioms/resource-manager.html#example-creating-a-temporary-dask-clusterJonathan Chu
04/02/2021, 12:34 AMCA Lee
04/02/2021, 1:21 AMmatta
04/02/2021, 1:47 AMmapped
state?Jonathan Chu
04/02/2021, 1:48 AMRUN
tab with Docker
doesn't seem to parse the JSON version of the Environment variables correctly
seems to keep the wrapping quotes as part of the valueJeremy Tee
04/02/2021, 7:47 AMprefect cloud
, and my flow will invoke aws lambda
and return me with the response. However, after registering my flow, whenever i try to run it, it throws Unexpected error: TypeError("cannot serialize '_io.BufferedReader' object")
Is there a workaround on this?
@task(name="invoke_lambda")
def invoke_lambda(function_name, table_path, etl_target_date):
lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps({"table_path": table_path, "etl_target_date": etl_target_date}),
)
return response
with Flow(
"test-flow",
executor=LocalExecutor(),
run_config=LocalRun(),
storage=S3(
bucket="random-bucket",
),
) as flow:
x = invoke_lambda("test", "a/b/c", "2021/04/02")
flow.register(project_name="xxx", labels=["dev"])
Matthew Blau
04/02/2021, 1:34 PM@task(max_retries=3)
but how can I set up the retry logic if I am not setting up tasks with the functional API? I do not see anything in the docs that explains. Thank you in advance!Marwan Sarieddine
04/02/2021, 4:13 PMNikola Milushev
04/02/2021, 4:34 PM@task(max_retries=6, retry_delay=timedelta(minutes=10), timeout=60)
, however it seems the task run is stuck before it can trigger the retry. May be there another reason this is happening other than the OOM Killer, as seen in a similar topic from 15.03?Jay Shah
04/03/2021, 11:41 PMUnexpected error: TypeError('execute() takes no keyword arguments')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/sql_server/sql_server.py", line 90, in run
executed = cursor.execute(query=query, vars=data)
TypeError: execute() takes no keyword arguments
tash lai
04/05/2021, 6:01 AMproduce
stay in memory until the flow finishes, or will it be removed as soon as consume
finishes?
@task
def produce(url):
return download_big_json(url)
@task
def consume(big_json):
do_something(big_json)
with Flow('my_flow') as flow:
urls = Parameter('urls')
produced = produce.map(urls)
consume.map(produced)
Varun Joshi
04/05/2021, 8:20 AMJeremy Tee
04/05/2021, 10:00 AMRob Fowler
04/05/2021, 12:16 PMhaf
04/05/2021, 5:11 PMBrett Naul
04/05/2021, 6:12 PMJoseph Loss
04/05/2021, 6:54 PMTomás Emilio Silva Ebensperger
04/05/2021, 9:43 PMZach Hodowanec
04/05/2021, 9:54 PMWillian Chan
04/05/2021, 9:59 PMGitLab
The UI tells me: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'mail_client'")
.
The main problem here is that the file mail_client.py is not present in the agent, and for me it is impracticable to send each auxiliary script to the agent (there is going to be a lot of flows)
The structure of the repository:
gitlab-repository/
├── flow.py
└── mail_client.py
Inside my flow.py it imports the mail_client:
from mail_client import MailClient
...
...
The configuration for GitLab
storage:
flow.storage = GitLab(
repo="XXXXX",
host="XXXXX",
path="flow.py",
secrets=["GITLAB_ACCESS_TOKEN"]
)
I need the agent to be able to pull the entire repository because there will be many processes being inserted in the prefect and there is no way to change the agent with each modification in a process.
does anyone have any solution for this? ThanksJonathan Chu
04/06/2021, 12:40 AMmatta
04/06/2021, 1:51 AMDaskExecutor
in a Jupyrer notebook and I'm using threads (cluster_kwargs={"processes": False}
) then I see the logs in the notebook. If I take that off though, then the logs disappear. How do I have the logs go to the notebook again?Wolfgang Steitz
04/06/2021, 9:36 AMLevi Leal
04/06/2021, 11:04 AMlogger = prefect.context.get('logger')
logger.addHandler(log_handler)
I need something like this
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)
I add the handler to the 'root' logger and everything is logged the way I need.
I've tried the latter and it works fine with flow.run()
, but when I register the flow I can't get it to work with k8s.
More details in the threadhaf
04/06/2021, 1:07 PMMarwan Sarieddine
04/06/2021, 2:10 PMv0.14.15
basically the failure happens after the flow has completed running at the state handler level - please see the traceback in the threadFlorian Kühnlenz
04/06/2021, 3:00 PMprefect register --module
work.
My project looks like this:
flows
+ __init__.py
+ my_flow.py
+ shared_tasks
+ __init__.py
+ util.py
When I run prefect register --project 'Prefect Testing' -m '<http://flows.my|flows.my>_flow'
, I get No module named 'flows'
. What am I missing?Andor Tóth
04/06/2021, 4:01 PMSQL_DIR = Path('sql')
@task
def list_query_names():
return [f.name for f in SQL_DIR.glob('*.sql')]
@task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
def exec_query(name: str):
sql = Path(SQL_DIR / name).read_text()
print('Query name: %s' % name)
engine = sqla.create_engine(DSN)
rs = engine.execute(sql)
return dict(keys=rs.keys(), rows=rs.fetchall())
@task
def save_results(rs, name):
with (OUT_DIR / name).with_suffix('.txt').open('w') as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerow(rs['keys'])
csv_writer.writerows(rs['rows'])
with Flow("Queries") as flow:
query_names = list_query_names()
results = exec_query.map(query_names)
save_results.map(results, query_names)
flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
flow.run()
Robin
04/06/2021, 4:13 PMHow do you build your prefect flows to ensure that the docker images run on different/the desired architectures/OS?