Shivam Bhatia
01/11/2022, 11:23 AMVamsi Reddy
01/11/2022, 1:36 PM[module_name1, module_name2,....]
. we are using a for loop to iterate over the list and submit the steps and check the status. However prefect does not submit these steps in sequence/order of the list. How do I make sure that the order is maintained ? Below is a screenshot of the code:Henrietta Salonen
01/11/2022, 2:18 PMBen Collier
01/11/2022, 2:23 PMPrudhvi Kalakota
01/11/2022, 2:33 PMTom Shaffner
01/11/2022, 3:48 PM_result_=LocalResult(_location_='{plan}_plan_data.prefect')
. In my case my mappings are "plans". The problem though is that in one case a plan/map pulls data and caches it, and then a different plan/map subsequently reads that same data from cache and uses it!!
Any idea what might cause this kind of behavior? It causes a bunch of my plan/maps to just not work.Gabriel Milan
01/11/2022, 4:06 PMUnexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__')
Traceback (most recent call last):
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 926, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/results/gcs_result.py", line 75, in write
binary_data = new.serializer.serialize(new.value)
File "/opt/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/opt/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/opt/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
I'm using Prefect 0.15.9 and pymssql 2.2.3. The task that raised it is the following
@task
def sql_server_get_connection(server: str, user: str, password: str, database: str):
"""
Returns a connection to the SQL Server.
"""
log(f"Connecting to SQL Server: {server}")
# pylint: disable=E1101
return pymssql.connect(
server=server, user=user, password=password, database=database
)
Hwi Moon
01/11/2022, 6:09 PMDaniel Kornhauser
01/11/2022, 6:13 PMquery {
flow_run{
logs {
message
level
info
created
}
}
}
Of course a totally different query to get error logs from graphql would also be welcomedMatthew Seligson
01/11/2022, 7:18 PMHeeje Cho
01/11/2022, 9:51 PMLeon Kozlowski
01/11/2022, 10:31 PMSidney Fox
01/11/2022, 11:13 PMFailed to scan table: Unable to locate credentials
I have AWS_CREDENTIALS
stored in Prefect cloud as a Secret, and I’ve tried passing credentials as envs passed to KuberenetesRun:
env={
"ACCESS_KEY": Secret("AWS_CREDENTIALS").get().get("ACCESS_KEY"),
"SECRET_ACCESS_KEY": Secret("AWS_CREDENTIALS").get().get("SECRET_ACCESS_KEY")
}
Returns the same error. What’s the best / preferred approach to authenticate a Kubernetes agent against AWS?Jeff Wiens
01/12/2022, 12:42 AMM. Siddiqui
01/12/2022, 8:33 AMDekel R
01/12/2022, 10:07 AMif pendulum.today('America/New_York').weekday() == 2: # Monday is 0 so Wednesday is 2
x_flow = create_flow_run(flow_name=PREFECT_TRAIN_FLOW_NAME, project_name=PREFECT_TRAIN_PROJECT_NAME)
wait_for_flow_a = wait_for_flow_run(x_flow, raise_final_state=True)
This code is of course inside my “with Flow….” code block.
Now when running this code alone (in a dummy flow) - it works and x_flow gets invoked.
But when running this code in my real flow, after some other tasks - nothing happens.
I cannot even see the task of “wait_for_flow” in prefect cloud(flow -> tasks tab) - seems like its getting ignored.
What am I missing here?
ThanksRam Vuppaladadiyam
01/12/2022, 12:44 PMTony Liberato
01/12/2022, 2:50 PMJustin Green
01/12/2022, 3:00 PMRouven
01/12/2022, 4:11 PMQwame
01/12/2022, 4:21 PMSuresh R
01/12/2022, 4:57 PMJason May
01/12/2022, 5:30 PMDidier Marin
01/12/2022, 5:32 PMJawaad Mahmood
01/12/2022, 6:46 PM### THIS CODE ALLOWS ME TO BIND A LOCAL FILE PATH
from prefect.run_configs import DockerRun
import docker
flow.run_config = DockerRun(labels=['my-label']
,host_config={'mounts':[docker.types.Mount(target='/public'
,source=r'//c/some/local/path'
,type='bind')
]
}
)
### THIS CODE THROWS ERROR
flow.run_config = DockerRun(labels=['my-label']
,host_config={'mounts':[docker.types.Mount(target='/public'
,source=r'\\path\to\windows\network\shared\drive'
,type='bind')
]
}
)
Chris Reuter
01/12/2022, 7:57 PMFilipe Reis
01/12/2022, 8:32 PMChristoph Deil
01/12/2022, 9:39 PMschedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
Do I now use OrionClient and some methods to deploy?
We currently use Prefect core in a pod and simply do flow.run() with a schedule attached, and I’m looking for a working example to do the equivalent in Orion (even if I gather behind the scenes it will do something else via a server and DB).
Basically I’m looking for this: https://orion-docs.prefect.io/concepts/deployments/#running-deployments-with-the-api
🙂Daniel Komisar
01/12/2022, 10:10 PMcreated
(or any other field) guaranteed to be returned in the same order? Thanks!Jason Motley
01/12/2022, 10:54 PMdf_2015 = extract_past(connection, start_date="2015-01-01", end_date = "2015-12-31", task_args={"name": "Extract 2015"})
Jason Motley
01/12/2022, 10:54 PMdf_2015 = extract_past(connection, start_date="2015-01-01", end_date = "2015-12-31", task_args={"name": "Extract 2015"})
Kevin Kho
01/12/2022, 10:57 PMJason Motley
01/12/2022, 10:58 PMKevin Kho
01/12/2022, 11:03 PMJason Motley
01/12/2022, 11:04 PM