Marvin
09/16/2020, 11:59 AMsebastian.clanzett
09/16/2020, 12:40 PMwith Flow("Flow1") as flow:
config = get_config('local')
value1, value2 = task1()
If i try this i get : TypeError: 'FunctionTask' object is not iterableDinu Gherman
09/16/2020, 1:29 PMprefect backend server
and prefect server start
, got the web UI up and wonder how I can make the standard hello world example below appear in that UI? Do I need a “project”? If so how do I wire that up with the example code running?
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)
flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"
Charlie Cahoon
09/16/2020, 4:02 PMKubernetesJobEnvironment
and some of our flows are failing with the error below. I'm sure it's something minor... if we wait a few minutes and retry from the UI it does eventually run.
Failed to create Kubernetes job: (409)
Reason: Conflict
HTTP response headers: HTTPHeaderDict({'Audit-Id': '8b3f7073-0520-49a3-9d38-60d761af5fbe', 'Content-Type': 'application/json', 'Date': 'Wed, 16 Sep 2020 15:55:38 GMT', 'Content-Length': '242'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"workflows-prefect-job\" already exists","reason":"AlreadyExists","details":{"name":"workflows-prefect-job","group":"batch","kind":"jobs"},"code":409}
Glen Trudeau
09/16/2020, 4:08 PMFailed to load and execute Flow's environment: TypeError('an integer is required (got type bytes)')
Vipul
09/16/2020, 4:28 PMkevin
09/16/2020, 4:33 PMLeonard Marcq
09/16/2020, 7:50 PMDaskCloudProviderEnvironment
, it works pretty well and was much easier to set up than I expected. But it seems that the cluster that gets spawned runs the Dask tasks into the default VPC in AWS (and its associated security group), but I would prefer those tasks to run in another VPC than the default one. I did not find much info on the network config for that in the doc or in Dask's Cloud Provider documentation - ideally I would like to pass specific security groups / subnets and all that for the Dask cluster.
(1) Is there a way of passing a network config to the Fargate Dask cluster through the DaskCloudProviderEnvironment
interface (or some other way)?
(2) Does it matter what kind of agent picks up that flow? From what I understand, the only thing the agent does in this case is spawn a cluster with the Scheduler and the Worker nodes in it and check that the tasks completed at the end (and maybe tear down the Dask cluster?), so I guess any agent could do the job or does it specifically have to also be a Fargate agent for some reason?Hui Zheng
09/16/2020, 11:02 PMScheduled to start
, but there is not any run logs and no gke job was created. All runs scheduled for the past 3 hours had the same issue. If we manually start a flow-run, it also run into the same issue. The project agent shows in good status. Those delayed runs won’t even show up in the flow runs tab. It seems to me that the flow scheduled the run, but somehow was not able to start the run when the time comes.
[1] https://cloud.prefect.io/semios/flow/dcf941bd-6365-45e8-aafd-71daae6c29f0?version=7Valentin Willscher
09/17/2020, 7:58 AMAdam
09/17/2020, 9:32 AMPostgresFetch
. It seems to require user
and host
arguments when declaring the task and then password
can be supplied when instantiating it. How can I send all when I instantiate it rather? I’m trying to achieve something like this:
get_customers = PostgresFetch(
name="Get customers ready for migration",
db_name="postgres",
user="CAN_I_USE_ENV_SECRET_HERE?",
host="CAN_I_USE_ENV_SECRET_HERE?",
query="""
SELECT DISTINCT customer_id
FROM customer_migration_request
""",
fetch="all",
)
with Flow("migrate customers") as flow:
postgres_user = EnvVarSecret("POSTGRES_USER", raise_if_missing=True)
postgres_password = EnvVarSecret("POSTGRES_PASSWORD", raise_if_missing=True)
postgres_host = EnvVarSecret("POSTGRES_HOST", raise_if_missing=True)
customers = get_customers(
password=postgres_password, user=postgres_user, host=postgres_host
)
Iain Dillingham
09/17/2020, 1:53 PMprefect.context.scheduled_start_time
contains slightly different timezone information depending on whether I run the flow with my_flow.run()
or whether it is run by the Docker agent.
• Run with my_flow.run()
the object as follows: DateTime(2020, 9, 17, 13, 28, 18, 669528, tzinfo=Timezone('UTC')
.
• Run by the Docker agent the object is as follows: DateTime(2020, 9, 17, 13, 45, 0, tzinfo=Timezone('+00:00'))
.
The distinction is important because they are both instances of pendulum.DateTime
. Calling <http://dt.to|dt.to>_iso8601_string()
results in a different string, because if the timezone is UTC, then this method replaces the trailing +00:00
with Z
. Whether or not this is a good idea, it would be good for prefect.context.scheduled_start_time
to return consistent timezone information.simone
09/17/2020, 2:02 PMout = func.map([A1, A2, A3, A4, A5, B1, B2, B3, B4, B5])
in the next step I would like to partially reduce the output and combine only the matching subgroups ex combining A = out[0:5] B =out[5::]
and then process in parallel A
and`B`.
I have three questions:
(1) If I understood correctly order matters for mapping in prefect so input and output have the same order, correct?
(2) I am running the code on a HPC. If I proceed this way will the entire out
be collected in memory or the different output groups dispatched to the specific worker where the reduce is happening?
(3) Is there a more efficient way to do this?
thanks a lot!Pedro Machado
09/17/2020, 4:18 PMale
09/17/2020, 4:25 PMsundaram
09/17/2020, 4:26 PMjars
09/17/2020, 6:08 PMHamza Ahmed
09/17/2020, 7:30 PMMarvin
09/17/2020, 7:30 PMSteven Kalt
09/17/2020, 8:26 PMMarvin
09/17/2020, 8:26 PMGlen Trudeau
09/17/2020, 9:10 PMMarvin
09/17/2020, 9:10 PMSam
09/17/2020, 9:26 PMMarvin
09/17/2020, 9:26 PMJacob Blanco
09/18/2020, 4:46 AMsark
09/18/2020, 4:55 AMLewis Bails
09/18/2020, 6:15 AMMarvin
09/18/2020, 6:15 AMFanilo A.
09/18/2020, 7:09 AM