jspeis
12/11/2020, 6:20 PMAndrey Tatarinov
12/11/2020, 7:02 PMSanjay Patel
12/11/2020, 10:50 PMwith Flow('first') as flow:
a = task_a()
b = task_b(a)
c = task_c(b)
StartFlowRun(flow_name='second', project_name=...)(parameters={"input"=c})
this is the closest to what I am trying to do but the response ended with using state_result.
with Flow('first') as flow:
a = task_a()
b = task_b(a)
c = task_c(b)
with Flow('second') as flow:
param = Parameter('input')
d = task_d(param)
# How to do something like this:
flow_a = StartFlowRun(flow_name="first", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="second", project_name="examples", wait=True)
with Flow('total') as flow:
a = flow_a()
b = flow_b()(upstream_tasks=[a], parameters={'input': a.d})
I have hte option of changing one of my flows so it produces the output I need for the next flow. so 2 questions
• how does the output of flow_a get referenced into flow b
• how do you access the parameter input that comes from flowa into flowb
thank you!Aiden Price
12/12/2020, 12:23 AMapply_map()
to my flow I get this error; ValueError: Cycle found; flows must be acyclic!
Unfortunately the stack trace doesn't provide any hints as to where I've accidentally created a cycle in the graph.
I've done some comment-driven-debugging and found that the problem is related to a case statement in the function I'm attempting to apply_map()
.
I've also done flow.visualize()
and it does output a picture but because it's a big flow it's hard to spot the cycle in there.
Does anyone have any hints on how to better debug cycles in flow graphs?
Thanks in advance.Vincent
12/12/2020, 2:49 AMAndreas Jung
12/14/2020, 11:04 AMimport time
from base64 import b64decode
from prefect import Flow, task
import reportparser
@task
def fetch_message():
ts = time.time()
report_data = b64decode(
b"ew4AAAAAAADmAgAABgAAAQAAH+EH3QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIBPwgQAA4AAAAA"
)
print("fetch ", time.time())
print("fetch ", time.time() - ts)
return report_data
@task
def classify_message(msg):
msg_type = 0
ts = time.time()
print("classify", time.time())
print("classify", time.time() - ts)
return msg_type
@task
def parse_message(msg, msg_type):
ts = time.time()
result = reportparser.parse(msg)
print("parse ", time.time())
print("parse ", time.time() - ts)
return result
@task
def deliver_message(result):
ts = time.time()
print("deliver ", time.time())
print("deliver ", time.time() - ts)
def main():
with Flow("reportparser") as flow:
msg = fetch_message()
msg_type = classify_message(msg)
result = parse_message(msg, msg_type)
deliver_message(result)
for i in range(20):
print()
ts = time.time()
flow.run()
print("total", time.time() - ts)
if __name__ == "__main__":
main()
Execution time (absolute time, relative time per task):
fetch 1607943883.0552974
fetch 0.0001933574676513672
classify 1607943883.3690984
classify 0.00015401840209960938
parse 1607943883.6593442
parse 0.0009167194366455078
deliver 1607943883.9757109
deliver 0.0001609325408935547
total 1.3962466716766357
Christian
12/14/2020, 11:26 AMRunGreatExpectationsValidation
...
I always get a FAIL
state returned from executing it and no artifacts page. Is it to be expected that only a successful validation produces the markdown artifacts page?
I'm fighting with several aspects of my setup and it would be good to be able to inspect what the GE component is actually doing... Also, in general, I'd want to see the failed validation results...
ThanksAdam
12/14/2020, 12:05 PMMarc Lipoff
12/14/2020, 2:42 PMMarc Lipoff
12/14/2020, 2:42 PMAndrey Tatarinov
12/14/2020, 7:14 PMresult=GCSResult
and cache_for=timedelta(hours=1)
.
I notice, that when I'm not rebuilding docker image Prefect is respecting cache, i.e. second run goes much faster than first. But it seems that each rebuild of an image invalidates cache.
Q: is it true? How can I get more insight on how caching works?Will Milner
12/14/2020, 8:24 PMtask = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
with Flow("test") as flow:
print_test = task(command="echo hi")
I except to see hi
in my prefect logs but I see nothing. I have my logging level set to DEBUGLevi Leal
12/14/2020, 8:28 PM$ kubectl -n agent logs prefect-job-2d655498-sqqlc
[2020-12-14 20:20:37+0000] INFO - prefect.S3 | Downloading slow-flow/2020-12-07t12-16-09-863598-00-00 from lime-prefect
Unable to locate credentials
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 34, in flow_run
return _execute_flow_run()
File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
raise exc
File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 84, in _execute_flow_run
flow = storage.get_flow(storage.flows[flow_data.name])
File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 105, in get_flow
self._boto3_client.download_fileobj(
File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
return future.result()
File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 106, in result
return self._coordinator.result()
File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 265, in result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 255, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/usr/local/lib/python3.8/site-packages/s3transfer/download.py", line 340, in _submit
response = client.head_object(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 662, in _make_api_call
http, parsed_response = self._make_request(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 682, in _make_request
return self._endpoint.make_request(operation_model, request_dict)
File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 102, in make_request
return self._send_request(request_dict, operation_model)
File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 132, in _send_request
request = self.create_request(request_dict, operation_model)
File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 115, in create_request
self._event_emitter.emit(event_name, request=request,
File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 356, in emit
return self._emitter.emit(aliased_event_name, **kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 228, in emit
return self._emit(event_name, kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 211, in _emit
response = handler(**kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 90, in handler
return self.sign(operation_name, request)
File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 162, in sign
auth.add_auth(request)
File "/usr/local/lib/python3.8/site-packages/botocore/auth.py", line 357, in add_auth
raise NoCredentialsError
botocore.exceptions.NoCredentialsError: Unable to locate credentials
Anyone who managed to make EKS Fargate work with prefect agents?Levi Leal
12/14/2020, 8:28 PMBerty
12/14/2020, 11:00 PMwith Flow('...') as f:
t = task(some_func) # <-- returns tuple
c = task(consume_func, keyword_tasks={'config': t[0]})
DJ Erraballi
12/14/2020, 11:34 PM2020-12-12T16:53:10.075-08:00 [2020-12-13 00:53:10] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
2020-12-12T16:53:10.075-08:00 Traceback (most recent call last):
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
2020-12-12T16:53:10.075-08:00 cache_for=self.task.cache_for,
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
2020-12-12T16:53:10.075-08:00 version=version,
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
2020-12-12T16:53:10.075-08:00 raise ClientError(result["errors"])
2020-12-12T16:53:10.075-08:00 prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
2020-12-12T16:53:10.075-08:00 ERROR:prefect.CloudTaskRunner:Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
2020-12-12T16:53:10.075-08:00 Traceback (most recent call last):
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
2020-12-12T16:53:10.075-08:00 cache_for=self.task.cache_for,
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
2020-12-12T16:53:10.075-08:00 version=version,
2020-12-12T16:53:10.075-08:00 File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
2020-12-12T16:53:10.075-08:00 raise ClientError(result["errors"])
2020-12-12T16:53:10.075-08:00 prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
2020-12-12T16:53:10.087-08:00 [2020-12-13 00:53:10] INFO - prefect.CloudTaskRunner | Task 'aggregate_patients': finished task run for task with final state: 'ClientFailed'
Ran into this yesterdsay, seemed intermittent but is thsi due to prefect cloud being downjars
12/15/2020, 1:29 AMHui Zheng
12/15/2020, 2:21 AMVersion Locking
is it saying enforcing the assertion that a submitted/triggered flow run would run once and only once?
<https://docs.prefect.io/orchestration/concepts/flows.html#toggle-version-locking>
Robert Bastian
12/15/2020, 3:08 PMdef get_record_count():
logger = prefect.context.get("logger")
pg = PostgresFetch(host='<http://hostname.us-east-1.rds.amazonaws.com|hostname.us-east-1.rds.amazonaws.com>', db_name='testdatabase', user='test')
pg.query="select count(*) from testdatabase.test_table"
secret = PrefectSecret('TEST_PASSWORD')
password = secret.run()
result = pg.run(password=password)
return result[0]
Rolf Schick
12/15/2020, 3:21 PMShawn Marhanka
12/15/2020, 5:11 PMRaghav Thirumulu
12/15/2020, 6:00 PMprofiles.yml
was created correctly. im able to run dbt commands with the newly created profiles.yml
outside of the prefect packageHui Zheng
12/15/2020, 6:23 PMFAIL
?Matt Drago
12/16/2020, 12:00 AMGCRResult
. I have Prefect running in a kubernetes cluster, using the KubernetesRun
with Docker
storage. I've set the created the GCRResult
using:
result = GCSResult(
bucket='REDACTED',
location='flow-results/{flow_name}/{date:%Y/%m/%d}/{task_name}_{task_run_id}.prefect'
)
And assigned the result to the flow with:
with Flow("Redacted", schedule=schedule, result=result) as flow:
Weird thing is that the bucket name is being used to store the Result files, but not using the location template that I have provided. One thing that I noticed was that for one task (a function with the @task
decorator), I passed in an argument with the name date
and that Task did have it's Results stored in the location according to the template.Marc Lipoff
12/16/2020, 1:50 AMAn error occurred (UnauthorizedOperation) when calling the DescribeVpcs operation: You are not authorized to perform this operation.
Amanda Wee
12/16/2020, 3:13 AMmithalee mohapatra
12/16/2020, 3:27 AMMitchell Bregman
12/16/2020, 4:32 AMdocker.errors.APIError: 500 Server Error for <https://35.229.119.149:2376/v1.32/build?t=xxx-xxx.jfrog.io%2Fprefect%2Fprefect_qisvgvrv-compare_sqlserver_and_snowflake%3A0.0.0&q=False&nocache=False&rm=False&forcerm=True&pull=False&dockerfile=Dockerfile>: Internal Server Error ("Syntax error - can't find = in "Driver". Must be of the form: name=value")
Алексей Филимонов
12/16/2020, 8:21 AMv 0.13.12
, flows doesn't contain map and task runs without restart option.Vlad Koshelev
12/16/2020, 8:46 AMreferences_task = StartFlowRun(flow_name='references-etl', project_name='etl', wait=True)
facts_task = StartFlowRun(flow_name='facts-etl', project_name='etl', wait=True)
with Flow('main-etl') as main_flow:
files = get_files()
references_task.map(files)
facts_task.map(files)
@task
def get_group(file)
# get files "group" name from the file (20200101-facts.csv -> group=20200101)
@task
def check_references_done(group, file):
# get references tasks from Perfect DB for the group and check they are done
# or check if wait timeout reached (e.g. get time of the file creation and check if now - created_at > timeout)
@task
def check_no_another_facts_running(group, file):
# check if no "do_etl" tasks from Perfect DB with "running" state exist
@task
def do_etl(group, file):
...
with Flow('facts-etl') as facts_flow:
file = Parameter('file')
group = get_group(file)
check_references_done(group, file)
check_no_another_facts_running(group, file)
do_etl(group, file, upstream_tasks=[check_references_done, check_no_another_facts_running])