Max
11/03/2020, 1:57 PMChris White
Priyan Chandrapala
11/03/2020, 5:14 PMLukas N.
11/03/2020, 5:15 PMS3Result
to my flows and I have encountered a weird error (template issue?), which I tried to replicate in a minimal example fashion. I'm running the flow through Prefect server which I set up using prefect server start
and using local agent prefect agent start
. More info in threadmatta
11/03/2020, 9:21 PMmatta
11/03/2020, 9:22 PMmatta
11/03/2020, 9:56 PMBen Fogelson
11/03/2020, 10:19 PMapply_map
with functions that create parameters. Here’s a minimal example:
from prefect import Flow, Parameter, unmapped, apply_map
def func(x, y):
z = Parameter('z')
w = x + y + z
return w
with Flow('flow') as flow:
x = Parameter('x')
y = Parameter('y')
w = apply_map(func, x, y)
---------------------------------------------------------------------------
ValueError: Parameters must be root tasks and can not have upstream dependencies.
The error comes when apply_map
tries to add edges from the root tasks of the subgraph to the argument tasks passed to apply_map
. This fails when it tries to make an edge from x
to z
since z
is a parameter.
This error seems to preclude ever using apply_map
with a function that creates a parameter. I’m wondering if that constraint is necessary. My intuition is that if apply_map
sees a parameter that is in the subflow but not in the parent flow, it should assume that any edges from that parameter are unmapped edges that should be added to the parent flow. Does this seem right?Marwan Sarieddine
11/04/2020, 12:58 AMJasono
11/04/2020, 1:04 AMabhilash.kr
11/04/2020, 6:56 AMpsimakis
11/04/2020, 7:51 AMA
and flow B
. cron for flow B is 0 11 * * 3
(1 event every Wednesday at 11:00am). Last time I registered flow A
I noticed that two runs had been scheduled for flow B
for the upcoming Wednesday (both at 11:00am). When I re-registered flow B
, scheduling was fixed.
Have you any idea why this happening?
Thanks.
python: 3.7.3
prefect: 0.13.12
OS: Debian GNU/Linux 10 (buster)
Radek Tomsej
11/04/2020, 11:07 AMuttam K
11/04/2020, 11:42 AMRoey Brecher
11/04/2020, 1:18 PMmap
with StartFlowRun
?Philip MacMenamin
11/04/2020, 6:28 PMAgent connecting to the Prefect API at <http://localhost:4200>
but, I'm unable to see or use the agent.
(If I run curl on localhost:4200, I can connect without issue.)Hui Zheng
11/04/2020, 7:15 PMtask_a
and then I have a task_b
which takes the task_a()
as upstream task. task_b
always runs and process the task_a result even when the task_a fails
def task_a(threshold: float):
result = random()
if result > threshold:
return result
else:
# question 1: how do I get task_a to return the result of random() even when the FAIL signal is raised.
raise signals.FAIL("failed, result is less than the threshold")
@task(trigger=triggers.always_run)
def task_b(result_a: float):
# question 2: how could task b check if the task_a is a success or failure?
if result_a.is_failed(): # ???
print ('task_a failed with result {}'.format(result_a))
else:
print ('task_a suceeds with result {}'.format(result_a))
with Flow('task_b always handles task_a failure') as flow:
result_a = task_a(0.3)
result_b = task_b(result_a)
I have two questions:
1. how do I get task_a
to return the result of random()
even when the FAIL
signal is raised.
2. how do task_b
check if the task_a
is a success or failure?Hui Zheng
11/04/2020, 7:38 PMhttps://github.com/PrefectHQ/prefect/issues/3614please see this link or the thread for the details
Saatvik Ramisetty
11/04/2020, 9:05 PMcurl -I <http://myserver>
seems to return
HTTP/1.1 200 OK
Date: Wed, 04 Nov 2020 20:57:57 GMT
Content-Type: text/html
Content-Length: 12382
Connection: keep-alive
Server: nginx/1.18.0
Last-Modified: Tue, 13 Oct 2020 23:49:16 GMT
ETag: "5f863cfc-305e"
Accept-Ranges: bytes
But i can seem to connect on the UI.Jakub Hettler
11/04/2020, 10:52 PMMatt Drago
11/05/2020, 1:01 AMUnexpected error: PicklingError('Pickling client objects is explicitly not supported.\nClients have non-trivial state that is local and unpickleable.',)
I'm configuring the security credentials for GCP in .prefect/config.toml
under the [context.secrets]
section using the GCP_CREDENTIALS
field.
Doing flow.run()
, the flow completes sucessfully. I then do flow.register(...)
and then start the agent with flow.run_agent()
and manually kick off a run in the UI.
The following is an example of how I am adding the BigQueryLoadGoogleCloudStorage
task to the flow:
from prefect import Flow
from prefect.tasks.gcp.bigquery import BigQueryLoadGoogleCloudStorage
from google.cloud.bigquery import SchemaField
load_to_bigquery = BigQueryLoadGoogleCloudStorage(
dataset_id="data_lake_dev",
project= "data-platform"
)
with Flow("Move Data From DB to BQ") as flow:
load_to_bigquery(
uri="<https://storage.cloud.google.com/data-platform-dev/a-csv-file.csv>",
table="website_data",
schema=[
SchemaField(name="id", field_type="INT64", mode="REQUIRED"),
SchemaField(name="uuid", field_type="STRING"),
SchemaField(name="site", field_type="STRING"),
SchemaField(name="path", field_type="STRING"),
SchemaField(name="type", field_type="STRING", mode="REQUIRED"),
SchemaField(name="reported_at", field_type="TIMESTAMP", mode="REQUIRED"),
SchemaField(name="helpful", field_type="BOOL", mode="REQUIRED"),
SchemaField(name="message", field_type="STRING")
]
)
Neeraj Sharma
11/05/2020, 5:54 AMJoël Luijmes
11/05/2020, 7:14 AMLOOP
and write the results on each iteration as documented here. However, it doesn’t seem to work. It only writes the last iteration.
@task()
def log_output(result):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(result)
@task(result=LocalResult(dir='./results', location='test-{task_loop_count}.prefect'))
def loop_test():
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
print(n)
if n > 5:
return n
raise LOOP(f'Iteration {n}', result=dict(n=n+1))
with Flow("Postgres -> BigQuery") as flow:
x = loop_test()
log_output(x)
See output logging and diagnostics in threadale
11/05/2020, 8:58 AM@task
support both name
and task_run_name
at the same time?
It seems that name
takes precedence over task_run_name
, am I right?emre
11/05/2020, 9:47 AMsecrets
that I need to use in most of my tasks in a flow. I hate the visual of an upstream dependency to the same secret task from almost all my tasks. Is there a way for custom secrets to behave like, say AWS_CREDENTIALS
, as per: https://docs.prefect.io/core/concepts/secrets.html#default-secrets
Or a prefect backend compatible way of having my secrets reside in prefect.context
, or a similar context like construct?Robert Sokolowski
11/05/2020, 3:39 PMsimone
11/05/2020, 3:44 PMHui Zheng
11/05/2020, 6:06 PMRobin
11/05/2020, 8:57 PMTypeError: can't pickle _thread.lock objects
See full error message below.
Any ideas, what could be the reason?Rob Fowler
11/05/2020, 10:25 PM@task
def get_host_from_kv(opts):
hid = Schema({
'host': str,
Optional('identity', default=0): int
}).validate(opts.kvopts)
return HostParam(identity=hid['identity'], host=hid['host'])
with Flow("Query a single host") as flow:
opts = Parameter('opts')
result = winrm_powershell(host=get_host_from_kv(opts),
command=load_script(opts, 'win_ps_version.ps1'),
opts=opts)