Matthew Blau
04/12/2021, 2:44 PMGabe Grand
04/12/2021, 4:40 PMflow_1 (preprocessing) -> flow_2 (training) -> flow_3 (deployment)
Carlos Gutierrez
04/12/2021, 9:32 PMHugo Shi
04/12/2021, 11:54 PMS K
04/13/2021, 4:57 AMHello, I want to schedule a flow all 7 days a week, but on weekend want to reduce the run frequency to every 4 hours once instead of every one hour. This is the pice of code for all 7 days week to run every 1 hr from 6 am to 6 pm which needs modification for weekend schedule. schedule = Schedule(
# emit an event every hour
clocks=[
IntervalClock(
start_date=pendulum.datetime(2021, 4, 1, tz="America/Los_Angeles"),
interval=timedelta(hours=1))
],
# only include weekdays
#filters=[filters.is_weekday],
# only include 6am and 6pm
filters=[
filters.between_times(time(6), time(18))
# filters.between_times(time(6), time(18))
]
)
Domantas
04/13/2021, 10:55 AMFailed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n AttributeError("type object \'Members\' has no attribute \'ids\'")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - prefect: (flow built with \'0.14.15\', currently running with \'0.14.12\')')
Funny thing, that running locally(flow.run()) this error do not appear and it only occurs when executing code via server UI(will attach a screenshot with the error log).
For the context, I'm running Rust code in the python script which seems the main problem when using UI(running locally it works perfectly fine, so code itself seems to be good).
Maybe anyone knows how to solve this problem?
If there are some relevant information missing or there is already another thread is created related with this problem - let me know!Igor Bondartsov
04/13/2021, 11:22 AMSven Teresniak
04/13/2021, 11:45 AMprefect register
:
1. What is the purpose of prefect build
? I think I understand what it does. But when do I want to use prefect build
+ prefect register --json
ever?
2. Is there any difference between prefect register -p flow.py
on a flow file and calling flow.register(…)
as part of a python flow.py
call? The latter seems much more dynamic (e.g. do some initialization/behaviour changes in __main__
)Rowen
04/13/2021, 12:54 PMflow.run()
at the end of my python file. However, when I trigger the flow in the prefect cloud UI, it fails at the extract
stage, the error being At least one upstream state has an unmappable result
. Below is the code. I will elaborate more in the threads
@task
def transform(x):
return x + 30
@task
def extract():
return [200, 400, 500]
with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
e = extract()
t = transform.map(e) # fails when i trigger flow in the UI
Stéphan Taljaard
04/13/2021, 2:15 PMJonas
04/13/2021, 2:58 PMflow_database = StartFlowRun(flow_name="database", project_name="test", wait=True)
flow_transform = StartFlowRun(flow_name="transform", project_name="test", wait=True)
with Flow("parent-flow") as flow:
data = flow_database() #get a dataframe from flow
flow_transform(data) # take dataframe and use it another flow
How should I go about this?Charles Liu
04/13/2021, 3:52 PMAdam Lewis
04/13/2021, 4:11 PMMatthew Blau
04/13/2021, 6:47 PMenviornment=env_vars
in the CreateContainer() function but we need to have it inject updated values at runtime. I have had success with
flow.run_config = DockerRun(env={f"PREFECT__CONTEXT__SECRETS__{k}": v for k, v in config.context.secrets.items()})
but we are not utilizing prefect's config.toml for this project as well as the flow not being executed within a docker containerRicardo Portela
04/13/2021, 7:26 PMWill Milner
04/13/2021, 8:28 PMIsmail Cenik
04/13/2021, 8:31 PMJacob Blanco
04/14/2021, 5:42 AMset_flow_group_schedule
.
edit: Nevermind, I found it here: https://github.com/PrefectHQ/server/blob/4a9887091ffabe1a8c7098d6603663779d63e8eb/tests/graphql/test_flow_groups.py#L155
Leaving this here for posterityHawkar Mahmod
04/14/2021, 7:02 AMflow.register()
or prefect register
? I'm using S3
Storage and ECS
Run Config. I was previously using Docker
Storage but didn't like the rebuilding of the image each time the flow code changed. I am using AWS CodePipeline/CodeBuild, so I have Docker daemon at my disposal, and I have a base image that contains the whole repo the flow sits in. Ideally, in my build environment I can build the base image if necessary, and register my flow with Prefect Cloud. However it seems I can't register unless I have all my dependencies, which happen to sit in the base image anyway. So perhaps I docker run
and register this way?
I've read the GitHub discussion on flow deployment patterns but I am not fully clear on this particular question.Varun Joshi
04/14/2021, 8:33 AMfrom prefect.schedules import IntervalSchedule, filters
I made the below change in my code
ist = pendulum.timezone('Asia/Calcutta')
schedule = Schedule(
clocks=[IntervalClock(interval=datetime.timedelta(minutes=1))],
filters = filters.between_times(pendulum.time(6),pendulum.time(10))
However, this doesn't seem to be working.
Could someone please guide me on how to import filters and apply them to my schedule?
Thank you,
Varuntash lai
04/14/2021, 10:06 AMAurélien Vallée
04/14/2021, 11:41 AMbuild=True
?
The doc states:
ifBasically, I'm trying to automate the registration of flow in a CI environment, and I get warnings such as:, the flow's environment is built prior to serialization; defaults toTrue
True
UserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
serialized_flow = flow.serialize(build=build)
Well, basically, since I'm automating the registration of flows, I do not really know which ones are new, which ones are identical, and which ones changed.Aurélien Vallée
04/14/2021, 11:42 AMbuild=False
does not trigger the warning, though I don't really understand the impact it has.
Should I somehow manage to provide build=True
for the first time a flow is registered and then build=False
during updates?Robin
04/14/2021, 12:27 PMThere was a problem creating your schedule, please try again shortly. Error message: Error: GraphQL error: Invalid clock provided for schedule: {'type': 'CronClock', 'cron': '0 * * * *', 'parameter_defaults': {...}, 'start_date': {'dt': '2021-04-14T14:21:57.622602', 'tz': 'Europe/Amsterdam'}}
tash lai
04/14/2021, 1:42 PM@task
def donothing(x):
pass
with Flow('useless') as flow:
lst = list(range(4000))
donothing.map(lst)
flow.executor = DaskExecutor('<tcp://localhost:8786>')
dask-worker --nthreads=50
Thing is the worker quickly eats up a lot of memory with each mapped task run, up to a gigabyte at the end of the flow, and that memory is not cleared when the flow finishes. The project i'm working on implies running up to ~100000 io-heavy tasks, so seeing this i'm a little worried that prefect might not be a right tool for the job. But maybe it's me doing something wrong?Yuliy Kuroki
04/14/2021, 1:54 PMJeffery Newburn
04/14/2021, 4:32 PMVarun Joshi
04/14/2021, 5:22 PMJustin Chavez
04/14/2021, 8:12 PMrun_command
, and inside it is launching the command on a RunNamespacedJob
to use Kubernetes. I have multiple commands that take a while to complete so I would like multiple Namespaced Jobs to run at the same time, I tried using a mapping like:
with Flow as flow:
run_command.map([cmd1, cmd2,...])
But Prefect is running each Namespaced Job in serial. Would switching to a Dask executor be the key? Or could I adjust the map function to achieve parallelization?Riley Hun
04/14/2021, 10:51 PM